data-driven-docs

Selft training repo


Project maintained by ggranados Hosted on GitHub Pages — Theme by mattgraham

Reactor Core


Table of Contents


Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM

Reactor 3 requires Java 8 or + to run

Back to top

Dependencies

Maven

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.9</version>
</dependency>

Back to top

Gradle

dependencies {
    compile "io.projectreactor:reactor-core:3.5.9"
}

Log Dependencies

We’re also adding Logback as a dependency. This is because we’ll be logging the output of the Reactor in order to better understand the flow of data.

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.4.8</version>
</dependency>
dependencies {
    implementation 'ch.qos.logback:logback-classic:1.4.8'
}

Back to top

Reactive Data Types

Reactor Core gives us two data types that enable us to produce a stream of data, Flux and Mono.

In Project Reactor, Flux and Mono are examples of Publishers. They represent streams of data that emit elements or signals over time.

Both, Flux and Mono are implementations of the Reactive Streams Publisher interface. Both classes are compliant with the specification, and we could use this interface in their place:

Publisher<String> just = Mono.just("foo");

Back to top

Flux

Back to top

Mono

Back to top

Producing a Stream of Data

In order for an application to be reactive, the first thing it must be able to do is to produce a stream of data.

Without this data, we wouldn’t have anything to react to

Back to top

Using Flux

Back to top

Back to top

Back to top

Back to top

Back to top

Using Mono

Mono is typically used for producing a stream of zero or one element in Project Reactor. While it might not seem as common as using Flux, there are still scenarios where you would use Mono to represent a single result or outcome.

Back to top

Back to top

Back to top

Back to top

Back to top

Back to top

Why Not Only Flux?

You might wonder why you shouldn’t just use Flux for all cases. After all, Flux can represent zero, one, or multiple elements, which seems to cover all possibilities. However, there are reasons to use Mono in certain situations:

While Flux is more versatile and can handle a wide range of scenarios, using Mono when appropriate can lead to clearer and more maintainable code.

Back to top

Subscribing to a Stream

Subscribing to a stream in Project Reactor is quite straightforward. The subscribe() method is used to initiate the subscription and start processing the data emitted by the stream

Back to top

Collecting Elements

        List<Integer> elements = new ArrayList<>();

        Flux.just(1, 2, 3, 4)
        .log()
        .subscribe(elements::add);

        assertThat(elements).containsExactly(1, 2, 3, 4);

Back to top

The Flow of Elements

With logging in place, we can use it to visualize how the data is flowing through our stream:

20:25:19.550 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onComplete()

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that’s what’s been instantiated behind the scenes in our call to onSubscribe(). It’s a useful method, but to better understand what’s happening let’s provide a Subscriber interface directly:

  List<Integer> elements = new ArrayList<>();

  Flux.just(1, 2, 3, 4)
    .log()
    .subscribe(new Subscriber<Integer>() {
        
        @Override
        public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
                }
        
        @Override
        public void onNext(Integer integer) {
                logger.info("Consuming item: {}", integer);
                elements.add(integer);
                }
        
        @Override
        public void onError(Throwable t) {
                logger.error("An error occurred: {}", t.getCause());
                }
        
        @Override
        public void onComplete() {
                logger.info("Data stream complete");
                }
        });

        logger.info(elements.toString());

Output:

2023-08-17 17:22:03 INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2023-08-17 17:22:03 INFO  reactor.Flux.Array.1 - | request(unbounded)
2023-08-17 17:22:03 INFO  reactor.Flux.Array.1 - | onNext(1)
2023-08-17 17:22:03 INFO  c.e.r.ReactorDemoApplication - Consuming item:1
2023-08-17 17:22:03 INFO  reactor.Flux.Array.1 - | onNext(2)
2023-08-17 17:22:03 INFO  c.e.r.ReactorDemoApplication - Consuming item:2
2023-08-17 17:22:03 INFO  reactor.Flux.Array.1 - | onNext(3)
2023-08-17 17:22:03 INFO  c.e.r.ReactorDemoApplication - Consuming item:3
2023-08-17 17:22:03 INFO  reactor.Flux.Array.1 - | onNext(4)
2023-08-17 17:22:03 INFO  c.e.r.ReactorDemoApplication - Consuming item:4
2023-08-17 17:22:03 INFO  reactor.Flux.Array.1 - | onComplete()
2023-08-17 17:22:03 INFO  c.e.r.ReactorDemoApplication - Data stream complete
2023-08-17 17:22:03 INFO  c.e.r.ReactorDemoApplication - [1, 2, 3, 4]

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that Flux has provided us with a helper method to reduce this verbosity.

Back to top

Comparison to Java 8 Streams

It might appear that we have something synonymous to a Java 8 Stream doing collect:

List<Integer> collected = Stream.of(1, 2, 3, 4)
.collect(toList());

But we don’t.

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams, and apply backpressure

Back to top

Backpressure

Backpressure is when a downstream can tell an upstream to send it less data in order to prevent it from being overwhelmed.

Let’s tell the upstream to only send two elements at a time by using request()

 List<Integer> elements = new ArrayList<>();

 Flux.just(1, 2, 3, 4)
        .log()
        .subscribe(new Subscriber<Integer>() {
            private Subscription s;
            int onNextAmount;
            
            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
                s.request(2);
            }
        
            @Override
            public void onNext(Integer integer) {
                elements.add(integer);
                onNextAmount++;
                if (onNextAmount % 2 == 0) {
                    s.request(2);
                }
            }
        
            @Override
            public void onError(Throwable t) {}
        
            @Override
            public void onComplete() {}
        });

Output:

23:31:15.395 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.

Back to top

Operating on a Stream

We can also perform operations on the data in our stream

public class MapExample { public static void main(String[] args) { Flux numbers = Flux.range(1, 5);

Flux<Integer> squaredNumbers = numbers.map(n -> n * n);

squaredNumbers.subscribe(System.out::println);   } }   ```

Back to top

import reactor.core.publisher.Flux;

public class FilterExample {
  public static void main(String[] args) {
    Flux<Integer> numbers = Flux.range(1, 5);

    Flux<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);

    evenNumbers.subscribe(System.out::println);
  }
}

Back to top

import reactor.core.publisher.Flux;

public class FlatMapExample {
  public static void main(String[] args) {
    Flux<Integer> numbers = Flux.range(1, 3);

    Flux<Integer> doubledNumbers = numbers.flatMap(n -> Flux.just(n, n * 2));

    doubledNumbers.subscribe(System.out::println);
  }
}

Back to top

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReduceExample {
  public static void main(String[] args) {
    Flux<Integer> numbers = Flux.range(1, 5);

    Mono<Integer> sum = numbers.reduce(0, (a, b) -> a + b);

    sum.subscribe(System.out::println);
  }
}

Back to top

Combining Two Streams

import reactor.core.publisher.Flux;

public class ZipWithExample {
  public static void main(String[] args) {
    Flux<Integer> stream1 = Flux.range(1, 5);
    Flux<Integer> stream2 = Flux.range(6, 5);

    Flux<Tuple2<Integer, Integer>> zippedStream = stream1.zipWith(stream2);

    zippedStream.subscribe(tuple -> System.out.println("Tuple: " + tuple));
  }
}

In this example, zipWith combines elements from stream1 and stream2 into tuples of two elements. Each tuple contains one element from stream1 and one element from stream2. The resulting tuples are emitted in the zippedStream.

The output will be:

Tuple: (1, 6)
Tuple: (2, 7)
Tuple: (3, 8)
Tuple: (4, 9)
Tuple: (5, 10)

Back to top

Dealing with Time

Project Reactor provides a variety of operators and tools for dealing with time in reactive programming. These operators allow you to work with time-related aspects, such as delaying emissions, setting timeouts, scheduling tasks, and more.

Common time-related operators and techniques you can use in Reactor:

Back to top

Back to top

Back to top

These are just a few examples of how Reactor enables you to work with time in reactive programming

Choose the Appropriate Operator

Back to top

Hot Streams

Currently, we’ve focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.

These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

Back to top

Creating a ConnectableFlux

ConnectableFlux is a special type of Flux provided by Project Reactor that allows you to control the timing of the subscription to the underlying data source. It is often used in scenarios where you want to share a single source of data among multiple subscribers, but you want to control when the actual data flow starts.

ConnectableFlux doesn’t start emitting data to subscribers as soon as you subscribe to it. Instead, it requires an explicit call to the connect() method to begin emitting data to all subscribed consumers simultaneously.

Let’s create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
          while(true) {
              fluxSink.next(System.currentTimeMillis());
          }
        })
        .publish();

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won’t cause it to start emitting, allowing us to add multiple subscriptions:

publish.subscribe(System.out::println);        
publish.subscribe(System.out::println);

If we try running this code, nothing will happen. It’s not until we call connect(), that the Flux will start emitting:

publish.connect();

Back to top

Throttling

Throttling in reactive programming refers to controlling the rate at which events are emitted or processed

Here are a few throttling operators you can use:

Back to top

Back to top

Back to top

These are just a few examples of how you can use throttling operators in Project Reactor to control the rate of emissions in your reactive streams

Back to top

Concurrency

All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want.

Concurrency management in Project Reactor is achieved through its scheduler system. Reactor provides a scheduler abstraction that allows you to control the execution of reactive streams on different threads and thread pools

Some common implementations include:

Example

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class SchedulersExample {
    public static void main(String[] args) throws InterruptedException {
        Flux<Integer> flux = Flux.range(1, 10);

        flux
            .map(value -> {
                System.out.println("Mapping on thread: " + Thread.currentThread().getName());
                return value * 2;
            })
            .subscribeOn(Schedulers.parallel())
            .subscribe(value -> System.out.println("Received value: " + value));

        // Wait for a moment to allow emissions
        Thread.sleep(1000);
    }
}

In this example, we’re using the Schedulers.parallel() scheduler to parallelize the map operation. This means that the map function will execute on multiple threads from the parallel scheduler’s thread pool.

The subscribeOn operator indicates on which scheduler the whole stream should start executing.

The choice of scheduler depends on the nature of your operations and the concurrency requirements of your application. Reactor’s scheduler system provides you with tools to manage concurrency effectively and efficiently while maintaining the principles of reactive programming.

Back to top

Error Handling

Error handling is an important aspect of reactive programming, and Project Reactor provides several operators and techniques to handle errors gracefully in your reactive streams. Here’s an overview of error handling mechanisms in Reactor:

Back to top

Back to top

Back to top

These are just a few examples of how you can handle errors in reactive programming using Project Reactor. Error handling is crucial to ensure the robustness and stability of your reactive applications.

Back to top

Resource Management

Resource management is a crucial aspect of reactive programming, as it’s important to ensure that resources such as file handles, network connections, and database connections are properly managed and released when they are no longer needed.

Project Reactor provides mechanisms to manage resources effectively in a reactive context.

When working with reactive programming and managing resources, it’s important to consider the specific characteristics of your resources, the concurrency model, and the operators you use. By leveraging Reactor’s built-in resource management features and adopting best practices, you can effectively manage resources in your reactive applications.

Back to top


Ref.