reactor hot publisher

Mono vs Flux In Project Reactor

Overview:

In this tutorial, we will learn more about Mono vs Flux in Project Reactor (Java Reactive Programming). If you have not read the previous article on introduction to reactive programming, then read that here.

Java 8 – Optional vs Stream:

Java, as part part of the release 8, had introduced stream and optional.

  • Stream is a pipeline of computational operations through which 0 . . . . N elements are conveyed from a data source to produce desired result.
  • Optional is same like stream. But it is 0 or 1 element.

Stream Pipeline:

Lets first see how stream works.

//simple list
List<Integer> intList = Arrays.asList(1,2,3,4,5);
//creating stream from list
Stream<Integer> intStream = intList.stream()
                                    .filter(i -> i > 2 ) //filter elements which > 2
                                    .map(i -> i * i); //convert i into its square
//collect all the squares into a separate list
List<Integer> list1 = intStream
        .collect(Collectors.toList());  
//print
System.out.println(list1);

//output
[9, 16, 25]

Now lets add this statement to collect the calculated squares into list2 and see what happens

List<Integer> list2 = intStream
        .collect(Collectors.toList());

It will throw below exception!!!

java.lang.IllegalStateException: stream has already been operated upon or closed

Optional Pipeline:

An optional pipeline will look more or less like this.

Optional<Integer> optional = Optional.of(1)
                .filter(i -> i > 2)
                .map(i -> i * 2);

Note:

  • A stream pipeline can have N number of intermediate operations. But it can have only one terminal operator. That’s why the second collect did not work and throws the exception. Because the elements have been already collected from the stream into a list and the stream is closed.
  • Stream pipeline is synchronous.
  • Optional pipeline can have similar intermediate operations like stream and one terminal operator as well. However Optional can have 0 or 1 value.

Mono vs Flux:

If you have understood how Optional vs Stream works, then Mono vs Flux are similar in Java Reactive Programming with Project Reactor.

As per reactive-streams specification, A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). Reactor-core has a set of implementations of this Publisher interface. The 2 important implementations from which we would be creating sequences are Mono & Flux.

  • Mono: It will emit 0 or 1 element.
  • Flux: It will emit 0 or N element(s).

Flux:

Flux is an implementation of Publisher. It will emit 0 . . . N elements and/or a complete or an error call. (Image courtesy: project reactor site). Stream pipeline is synchronous whereas Flux pipeline is completely asynchronous. It will emit values only when there is a downstream subscriber.

mono vs flux

  • empty – To emit 0 element / or return empty Flux<T>
Flux.empty()
        .subscribe(i -> System.out.println("Received : " + i));

//No output
  • just – The easiest way to emit an element is using the just method.
    • subscribe method accepts a Consumer<T> where we define what we do with the emitted element.
Flux.just(1)
        .subscribe(i -> System.out.println("Received : " + i));

//Output
Received : 1
  • Unlike stream, Flux can have any number of Observers connected to the pipeline. We can also write like this if we need to connect more than 1 observers to a source.
    • 1 observer might be collecting all elements into a list while other observer could be logging the element details.
Flux<Integer> flux = Flux.just(1);
//Observer 1
flux.subscribe(i -> System.out.println("Observer-1 : " + i));
//Observer 2
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

//Output
Observer-1 : 1
Observer-2 : 1
  • just with arbitrary elements
Flux.just('a', 'b', 'c')
        .subscribe(i -> System.out.println("Received : " + i));

//Output
Received : a
Received : b
Received : c
  • We can have multiple observer and each observer will the process the emitted elements independently. They might take their own time. Everything happens asynchronously.
    • The below output shows that the entire pipeline is executed asynchronously by default.
System.out.println("Starts");

//flux emits one element per second
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
                            .delayElements(Duration.ofSeconds(1));
//Observer 1 - takes 500ms to process
flux
        .map(Character::toUpperCase)
        .subscribe(i -> {
            sleep(500);
            System.out.println("Observer-1 : " + i);
        });
//Observer 2 - process immediately
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

System.out.println("Ends");

//Just to block the execution - otherwise the program will end only with start and end messages
Thread.sleep(10000);

//Output
Starts
Ends
Observer-2 : a
Observer-1 : A
Observer-2 : b
Observer-1 : B
Observer-2 : c
Observer-2 : d
Observer-1 : C
Observer-1 : D
  • In the above code, I added below log method to better understand the behavior.
    • We have 2 observers subscribed to the source. This is why we have onSubscribe method
    • request(32) – here 32 is the default buffer size. Observer requests for 32 elements to buffer/emit.
    • elements are emitted one-by-one.
    • Once all the elements are emitted. complete call is invoked to inform the observers not to expect any more elements.
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
        .log()
        .delayElements(Duration.ofSeconds(1));

Output:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
  • The subscribe method could accept other parameters as well to handle the error and completion calls. So far we have been consuming the elements received via the pipeline. But we could also get some unhandled exception. We can pass the handlers as shown here.
subscribe(
        i -> System.out.println("Received :: " + i),   //consumer to handle the element
        err -> System.out.println("Error :: " + err),  //consumer to handle the error
        () -> System.out.println("Successfully completed") //runnable - to do once all the items processed
       )
  • Lets take this example. We get the below output as expected. Here we simply divide 10 by each element.
Flux.just(1,2,3)
        .map(i -> 10 / i)
        .subscribe(
            i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err),
            () -> System.out.println("Successfully completed")); 

//Output
Received :: 10
Received :: 5
Received :: 3
Successfully completed
  • Now if we slightly modify our map operation as shown here – we would be doing division by zero which will throw RunTimeException which is handled so well here without the ugly try/catch block.
Flux.just(1,2,3)
        .map(i -> i / (i-2))
        .subscribe(
                i -> System.out.println("Received :: " + i),
                err -> System.out.println("Error :: " + err),
                () -> System.out.println("Successfully completed"));   

//Output
Received :: -1
Error :: java.lang.ArithmeticException: / by zero
  • fromArray – when you have array. just should also work here.
String[] arr = {"Hi", "Hello", "How are you"};

Flux.fromArray(arr)
        .filter(s -> s.length() > 2)
        .subscribe(i -> System.out.println("Received : " + i));

//Output
Received : Hello
Received : How are you
  • fromIterable – When you have collection of elements and like to pass them via Flux pipeline.
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromIterable(list)
                        .map(String::toUpperCase);
  • fromStream – If you have stream of elements.
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromStream(list.stream())
                                .map(String::toUpperCase);
  • Be careful with Streams!! Flux can have more than 1 observer.  But below code will throw error saying that the stream has been closed.
//observer-1
stringFlux
        .map(String::length)
        .subscribe(i -> System.out.println("Observer-1 :: " + i));
//observer-2
stringFlux
        .subscribe(i -> System.out.println("Observer-2 :: " + i));
  • The above problem can be fixed by using Supplier<Stream>
Flux.fromStream(() -> list.stream())
     .map(String::toUpperCase);
  • range
//To provide a range of numbers
Flux.range(3, 5)
  • In all the above options, we already have elements found before emitting. What if we need to keep on finding and emitting elements programmatically? Flux has 2 additional methods for that. But these 2 methods need a separate article to explain as we need to understand what they are for and when to use what! Check here.
    • Flux.create
    • Flux.generate

Mono:

Mono is an another implementation of Publisher. It emits at most one item and then (optionally) terminates with an onComplete signal or an onError signal.. (Image courtesy: project reactor site). Like Flux, Mono is also asynchronous in nature.

 

  • just – to emit one single item
Mono.just(1)
    .subscribe(System.out::println);
  • Both Flux and Mono extends the Publisher<T> interface.
Publisher<Integer> publisher1 = Mono.just(1);
Publisher<Integer> publisher2 = Flux.just(1,2,3);
  • Using Callable/Supplier
Mono.fromCallable(() -> 1);
Mono.fromSupplier(() -> "a");
  • fromRunnable – We know that runnable does not accept any parameter and does not return anything either. So what do you think the below code will do?
Mono.fromRunnable(() -> System.out.println("Hello"))
        .subscribe(i -> System.out.println("Received :: " + i));
  • The above code would just print “Hello” and nothing else will happen as it is because there is no item to emit. But if we add the error and complete handler, we get the below output. It is helpful if we need to be notified when a runnable is completed.
Mono.fromRunnable(() -> System.out.println("Hello"))
        .subscribe(
                i -> System.out.println("Received :: " + i),
                err -> System.out.println("Error :: " + err),
                () -> System.out.println("Successfully completed"));

//Output
Hello
Successfully completed

Summary:

Hopefully this article gave you a high level idea about creating sequences using Mono & Flux and the comparison of Mono vs Flux.

Learn more about Java Reactive Programming.

Happy learning!!

Share This:

1 thought on “Mono vs Flux In Project Reactor

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.