Reactor Core
Table of Contents
- Reactor Core
Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM
- See also: Java 9 Reactive Streams
Reactor 3 requires Java 8 or + to run
Dependencies
Maven
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.9</version>
</dependency>
Gradle
dependencies {
compile "io.projectreactor:reactor-core:3.5.9"
}
Reactive Data Types
Reactive Core gives us two data types that enable us to produce a stream of data, Flux and Mono.
Both, Flux and Mono are implementations of the Reactive Streams Publisher interface. Both classes are compliant with the specification, and we could use this interface in their place:
Publisher<String> just = Mono.just("foo");
Flux
-
Fluxis used to represent a stream of zero to many elements (0-n). -
It is similar to a collection, but it’s designed to handle asynchronous and non-blocking scenarios.
-
A
Fluxcan emit any number of items over time, including zero items. -
It supports various reactive operations like transformation, filtering, mapping, combining, and more.
-
Examples of sources for creating a
Fluxinclude lists, arrays, generators, intervals, and more.
Mono
-
Monois used to represent a stream of zero or one element. -
It is conceptually similar to an
Optionalin Java, but it’s designed for reactive and asynchronous use cases. -
A
Monocan emit either a single item or no item at all (an empty signal) (0-1). -
It is often used for representing the outcome of an asynchronous operation, such as a database query or a network call.
-
Like
Flux,Monoalso supports various reactive operations.
Producing a Stream of Data
In order for an application to be reactive, the first thing it must be able to do is to produce a stream of data.
Without this data, we wouldn’t have anything to react to
Using Flux
-
Flux.just()
You can create a
Fluxthat emits a predefined sequence of values using theFlux.just()method.import reactor.core.publisher.Flux; public class FluxDemo { public static void main(String[] args) { Flux<String> flux = Flux.just("apple", "banana", "cherry", "date"); flux.subscribe(System.out::println); } }
-
Flux.fromIterable()
You can create a
Fluxfrom an existing iterable, such as a list or a collection.import reactor.core.publisher.Flux; import java.util.Arrays; import java.util.List; public class FluxDemo { public static void main(String[] args) { List<String> fruits = Arrays.asList("apple", "banana", "cherry", "date"); Flux<String> flux = Flux.fromIterable(fruits); flux.subscribe(System.out::println); } }
-
Flux.generate()
You can generate a stream of data using a custom stateful generator function.
import reactor.core.publisher.Flux; public class FluxDemo { public static void main(String[] args) { Flux<Integer> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next(state); if (state == 5) { sink.complete(); } return state + 1; } ); flux.subscribe(System.out::println); } }-
Flux.generate(state,generator): Creates a Flux using the generate method. The generate method takes two arguments: an initial state (provided by a supplier) and a generator function (BiFunction). The generator function generates values and provides them to the sink while maintaining some state. -
() -> 0: This is the initial state supplier. It returns an initial state of 0. -
(state, sink) -> {}: This is the generator function. It takes the current state and a sink as its arguments. The sink is used to emit values, signals, and control the flow of theFlux. -
sink.next(state): Emits the current state value through the sink. -
if (state == 5) { sink.complete(); }: Checks if the current state is equal to 5. If so, it completes the Flux by callingsink.complete(). This ensures that the stream terminates after emitting the value 5. -
return state + 1: Updates the state for the next iteration by incrementing it.
-
-
Flux.interval()
You can create a stream that emits elements at a regular interval.
import reactor.core.publisher.Flux; import java.time.Duration; public class FluxDemo { public static void main(String[] args) { Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)); flux.subscribe(System.out::println); // Sleep for a while to allow some emissions try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }
-
Flux.range()
You can create a stream that emits a range of integers.
import reactor.core.publisher.Flux; public class FluxDemo { public static void main(String[] args) { Flux<Integer> flux = Flux.range(1, 5); flux.subscribe(System.out::println); } }
Using Mono
Mono is typically used for producing a stream of zero or one element in Project Reactor. While it might not seem as common as using Flux, there are still scenarios where you would use Mono to represent a single result or outcome.
-
Mono.just()
You can create a
Monothat emits a single value using the Mono.just() method.import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<String> mono = Mono.just("Hello, world!"); mono.subscribe(System.out::println); } }
-
Mono.empty()
You can create an empty
Monothat doesn’t emit any value. This is useful for representing the absence of a result.import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<Object> emptyMono = Mono.empty(); emptyMono.subscribe( value -> System.out.println("Received value: " + value), error -> System.err.println("Error: " + error), () -> System.out.println("Completed") ); } }
-
Mono.defer()
You can use
Mono.defer()to create aMonothat is generated on each subscription. This can be useful when you want to create a new instance ofMonowith potentially different behavior for each subscriber.import reactor.core.publisher.Mono; public class DynamicMonoDemo { public static void main(String[] args) { // Create a supplier that generates a Mono with a random number Mono<Integer> dynamicMono = Mono.defer(() -> Mono.just((int) (Math.random() * 100))); // Subscribe and print a different random number on each subscription dynamicMono.subscribe(value -> System.out.println("Subscriber 1: " + value)); dynamicMono.subscribe(value -> System.out.println("Subscriber 2: " + value)); } }In the current code there’s no practical difference between using
Mono.defer(() -> Mono.just((int) (Math.random() * 100)))and simply usingMono.just((int) (Math.random() * 100)))directly, but there is a subtle but important difference between them:- Using Mono.just() directly:
Mono<Integer> dynamicMono = Mono.just((int) (Math.random() * 100));In this version, you are creating a single Mono instance that generates a random number between 0 and 100 at the time of creation. This random number is then emitted to all subscribers of this single Mono. Both subscribers will receive the same random value because the Mono emits the value only once when it is created.
- Using Mono.defer():
Mono<Integer> dynamicMono = Mono.defer(() -> Mono.just((int) (Math.random() * 100)));In this version, you are using Mono.defer() to create a new Mono instance for each subscription. Each time a subscriber subscribes to dynamicMono, the lambda inside Mono.defer() is executed, resulting in a new random value being generated and emitted separately for each subscriber. As a result, each subscriber receives a different random value.
- Using Mono.just() directly:
-
Mono.fromSupplier()
You can create a
Monofrom a supplier function that provides the value to be emitted.import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<String> mono = Mono.fromSupplier(() -> "Value from supplier"); mono.subscribe(System.out::println); } }
-
Mono.fromCallable()
You can create a
Monofrom a callable that represents a potentially blocking operation.import reactor.core.publisher.Mono; public class MonoDemo { public static void main(String[] args) { Mono<Integer> mono = Mono.fromCallable(() -> { // Simulate a potentially blocking operation Thread.sleep(1000); return 42; }); mono.subscribe(System.out::println); } }
Why Not Only Flux?
You might wonder why you shouldn’t just use Flux for all cases. After all, Flux can represent zero, one, or multiple elements, which seems to cover all possibilities. However, there are reasons to use Mono in certain situations:
-
Semantic Clarity: Using
Monowhen you expect at most one result or outcome can make your code more semantically clear. -
Error Handling: In a
Mono, errors terminate the stream immediately, while in aFlux, other items can still be emitted after an error. -
Synchronization and Composition: In some scenarios, you might need to wait for a single result before proceeding with subsequent operations. Using
Monoallows for clear synchronization points and more straightforward composition in such cases. -
Resource Management: When dealing with resources that need explicit management, such as connections or files, a
Monocan be a better fit. It ensures that the resource is released promptly after it’s no longer needed. -
Backpressure Considerations: If you’re working with downstream systems that are sensitive to backpressure (rate at which data is consumed), using
Monoensures a strict one-on-one relationship without the complexity of backpressure management that comes withFlux. -
API Contract: Some libraries and APIs might expect or return
Monoinstances for certain operations.
While Flux is more versatile and can handle a wide range of scenarios, using Mono when appropriate can lead to clearer and more maintainable code.
Subscribing to a Stream
Collecting Elements
The Flow of Elements
Comparison to Java 8 Streams
Backpressure
Operating on a Stream
Mapping Data in a Stream
Combining Two Streams
Hot Streams
Creating a ConnectableFlux
Throttling
Concurrency
Ref.
- https://projectreactor.io/
- https://github.com/reactor/reactor-core
- https://www.baeldung.com/reactor-core