data-driven-docs

Selft training repo


Project maintained by ggranados Hosted on GitHub Pages — Theme by mattgraham

Reactive Programming in Java


Table of Contents


Reactive Streams Specification

Is a set of interfaces and rules that define a standard for asynchronous stream processing with non-blocking backpressure in Java. It was developed to address challenges in handling asynchronous and potentially unbounded streams of data, ensuring that data producers (publishers) don’t overwhelm data consumers (subscribers) and lead to resource exhaustion.

The specification also defines rules and guidelines to ensure proper behavior and interoperability between different implementations. One of the key aspects of the specification is backpressure handling, which ensures that subscribers can signal their demand for items to publishers and prevent data overload.

The Reactive Streams Specification doesn’t provide a concrete implementation but serves as a blueprint for building reactive libraries and frameworks. Libraries like Project Reactor, Akka Streams, RxJava, and others adhere to the Reactive Streams Specification to provide standardized asynchronous stream processing with backpressure support in Java.

By adhering to the Reactive Streams Specification, different reactive libraries can interoperate seamlessly, and users can switch between libraries without significant code changes, ensuring a consistent and standardized approach to reactive programming in Java.

Back to top

Why Reactive Programming in Java

As a Java API developer, delays or multiple concurrent users are usually abstracted away since libraries like Spring MVC handle concurrency, we tend to code as if it were a single request. To do this, we pay with sequential blocking operations, and latent threads.

In Java backend development, reactive programming can offer several benefits:

Back to top

Concurrency API vs Reactive API

Check the following piece of code and try to tell what’s the problem with it?

@GetMapping("users/{userId}")
public User getUserDetails(@PathVariable String userId){
    User user = userService.getUser(userId);
    UserPreferences prefs = userPreferences.getPreferences(userId);
    user.setPreferences(prefs);
    return user;
}

This REST GET operation implementation straightforward, but there are potential problems and considerations that need to be addressed:

Back to top

Efficiency and Performance

In this implementation, two separate database calls are being made: one to retrieve the user details and another to retrieve user preferences. This could lead to performance issues, especially if the userService.getUser() and userPreferences.getPreferences() methods involve costly database queries or remote service calls. Making multiple calls like this can significantly impact the response time of the API, leading to slower user experiences.

Back to top

Asynchronous Calls

If the data retrieval involves time-consuming operations, consider using asynchronous calls to parallelize the tasks and improve overall response time

Consider now this implementation using CompletableFuture

@GetMapping("users/{userId}")
public CompletableFuture<User> getUserDetailsAsync(@PathVariable String userId) {
      CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getUser(userId));
      CompletableFuture<UserPreferences> prefsFuture = CompletableFuture.supplyAsync(() -> userPreferences.getPreferences(userId));

      CompletableFuture<Void> bothFutures = CompletableFuture.allOf(userFuture ,prefsFuture );

      bothFutures.join();
      User user = userFuture.join();
      UserPreferences prefs = prefsFuture.join();

      user.setPreferences(prefs);
      return user;
}

While the provided code does use CompletableFuture for asynchronous operations, there are still aspects that might impact its efficiency and performance:

Basically Dev has too much to handle, code is meesy, and bothFutures.join() is still blocking the main thread

Back to top

Reactive version

Consider the following code improvement using Project Reactor’s reactive programming model

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

// ...

@GetMapping("users/{userId}")
public Mono<User> getUserDetailsAsync(@PathVariable String userId) {
    return userService.get(userId)                        // Fetch user details asynchronously
        .zipWith(userPreferencesService.getPreferences(userId))   // Zip with user preferences
        .map(tuple -> {                                     // Process the zipped result
            User user = tuple.getT1();                     // Extract user from tuple
            UserPreferences prefs = tuple.getT2();          // Extract preferences from tuple
            user.setPreferences(prefs);                     // Set user preferences
            return user;                                    // Return the updated user
        });
}

Mono<User> means that the method getUserDetailsAsync returns a reactive stream that will eventually emit a single User object

This code uses Project Reactor’s reactive programming model to asynchronously fetch user details and preferences, combine the results using the zipWith operator, and then update the user’s preferences before returning the updated User object. It’s a concise and efficient way to handle asynchronous operations and data combination using reactive principles.

Project Reactor’s implementation offers several advantages over using CompletableFuture. But related to Efficient Concurrency:

Reactor’s design emphasizes efficient concurrency and resource management, allowing you to *handle large numbers of concurrent tasks with a smaller number of threads, thanks to its non-blocking nature and cooperative multitasking.

See also: Project Reactor

Back to top

Reactive Libraries for Java

Some of the most popular and recommended libraries for reactive programming in Java are:

Back to top


Ref.


Get Started | Paradigms | Reactive Programming | Java