Site icon Vinsguru

Mono vs Flux In Project Reactor

reactor hot publisher

Overview:

In this tutorial, we will learn more about Mono vs Flux in Project Reactor (Java Reactive Programming). If you have not read the previous article on introduction to reactive programming, then read that here.

Java 8 – Optional vs Stream:

Java, as part part of the release 8, had introduced stream and optional.

Stream Pipeline:

Lets first see how stream works.

//simple list
List<Integer> intList = Arrays.asList(1,2,3,4,5);
//creating stream from list
Stream<Integer> intStream = intList.stream()
                                    .filter(i -> i > 2 ) //filter elements which > 2
                                    .map(i -> i * i); //convert i into its square
//collect all the squares into a separate list
List<Integer> list1 = intStream
        .collect(Collectors.toList());  
//print
System.out.println(list1);

//output
[9, 16, 25]

Now lets add this statement to collect the calculated squares into list2 and see what happens

List<Integer> list2 = intStream
        .collect(Collectors.toList());

It will throw below exception!!!

java.lang.IllegalStateException: stream has already been operated upon or closed

Optional Pipeline:

An optional pipeline will look more or less like this.

Optional<Integer> optional = Optional.of(1)
                .filter(i -> i > 2)
                .map(i -> i * 2);

Note:

Mono vs Flux:

If you have understood how Optional vs Stream works, then Mono vs Flux are similar in Java Reactive Programming with Project Reactor.

As per reactive-streams specification, A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). Reactor-core has a set of implementations of this Publisher interface. The 2 important implementations from which we would be creating sequences are Mono & Flux.

Flux:

Flux is an implementation of Publisher. It will emit 0 . . . N elements and/or a complete or an error call. (Image courtesy: project reactor site). Stream pipeline is synchronous whereas Flux pipeline is completely asynchronous. It will emit values only when there is a downstream subscriber.

Flux.empty()
        .subscribe(i -> System.out.println("Received : " + i));

//No output
Flux.just(1)
        .subscribe(i -> System.out.println("Received : " + i));

//Output
Received : 1
Flux<Integer> flux = Flux.just(1);
//Observer 1
flux.subscribe(i -> System.out.println("Observer-1 : " + i));
//Observer 2
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

//Output
Observer-1 : 1
Observer-2 : 1
Flux.just('a', 'b', 'c')
        .subscribe(i -> System.out.println("Received : " + i));

//Output
Received : a
Received : b
Received : c
System.out.println("Starts");

//flux emits one element per second
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
                            .delayElements(Duration.ofSeconds(1));
//Observer 1 - takes 500ms to process
flux
        .map(Character::toUpperCase)
        .subscribe(i -> {
            sleep(500);
            System.out.println("Observer-1 : " + i);
        });
//Observer 2 - process immediately
flux.subscribe(i -> System.out.println("Observer-2 : " + i));

System.out.println("Ends");

//Just to block the execution - otherwise the program will end only with start and end messages
Thread.sleep(10000);

//Output
Starts
Ends
Observer-2 : a
Observer-1 : A
Observer-2 : b
Observer-1 : B
Observer-2 : c
Observer-2 : d
Observer-1 : C
Observer-1 : D
Flux<Character> flux = Flux.just('a', 'b', 'c', 'd')
        .log()
        .delayElements(Duration.ofSeconds(1));

Output:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(32)
[ INFO] (main) | onNext(a)
[ INFO] (main) | onNext(b)
[ INFO] (main) | onNext(c)
[ INFO] (main) | onNext(d)
[ INFO] (main) | onComplete()
subscribe(
        i -> System.out.println("Received :: " + i),   //consumer to handle the element
        err -> System.out.println("Error :: " + err),  //consumer to handle the error
        () -> System.out.println("Successfully completed") //runnable - to do once all the items processed
       )
Flux.just(1,2,3)
        .map(i -> 10 / i)
        .subscribe(
            i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err),
            () -> System.out.println("Successfully completed")); 

//Output
Received :: 10
Received :: 5
Received :: 3
Successfully completed
Flux.just(1,2,3)
        .map(i -> i / (i-2))
        .subscribe(
                i -> System.out.println("Received :: " + i),
                err -> System.out.println("Error :: " + err),
                () -> System.out.println("Successfully completed"));   

//Output
Received :: -1
Error :: java.lang.ArithmeticException: / by zero
String[] arr = {"Hi", "Hello", "How are you"};

Flux.fromArray(arr)
        .filter(s -> s.length() > 2)
        .subscribe(i -> System.out.println("Received : " + i));

//Output
Received : Hello
Received : How are you
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromIterable(list)
                        .map(String::toUpperCase);
List<String> list = Arrays.asList("vins", "guru");
Flux<String> stringFlux = Flux.fromStream(list.stream())
                                .map(String::toUpperCase);
//observer-1
stringFlux
        .map(String::length)
        .subscribe(i -> System.out.println("Observer-1 :: " + i));
//observer-2
stringFlux
        .subscribe(i -> System.out.println("Observer-2 :: " + i));
Flux.fromStream(() -> list.stream())
     .map(String::toUpperCase);
//To provide a range of numbers
Flux.range(3, 5)

Mono:

Mono is an another implementation of Publisher. It emits at most one item and then (optionally) terminates with an onComplete signal or an onError signal.. (Image courtesy: project reactor site). Like Flux, Mono is also asynchronous in nature.

 

Mono.just(1)
    .subscribe(System.out::println);
Publisher<Integer> publisher1 = Mono.just(1);
Publisher<Integer> publisher2 = Flux.just(1,2,3);
Mono.fromCallable(() -> 1);
Mono.fromSupplier(() -> "a");
Mono.fromRunnable(() -> System.out.println("Hello"))
        .subscribe(i -> System.out.println("Received :: " + i));
Mono.fromRunnable(() -> System.out.println("Hello"))
        .subscribe(
                i -> System.out.println("Received :: " + i),
                err -> System.out.println("Error :: " + err),
                () -> System.out.println("Successfully completed"));

//Output
Hello
Successfully completed

Summary:

Hopefully this article gave you a high level idea about creating sequences using Mono & Flux and the comparison of Mono vs Flux.

Learn more about Java Reactive Programming.

Happy learning!!

Share This:

Exit mobile version