reactor hot publisher

Reactor Flux Create vs Generate

Overview:

In this tutorial, I would like to show the difference between the Reactor Flux Create vs Generate with code samples.

If you are new to Java Reactive Programming, please take a look at below articles to give you an idea.

  1. Reactive Programming – A Simple Introduction
  2. Mono vs Flux In Project Reactor
  3. Reactor Hot Publisher vs Cold Publisher

Flux Create vs Generate:

Both Flux Create and Flux Generate methods are used to programmatically generate sequences for Flux/Mono. But there are differences between these two and when to use what etc. That is what this article is about.

Flux Create:

The create method accepts a FluxSink<T> consumer. That is, you would be given an instance of the FluxSink using which you can keep on emitting O …. N elements to the downstream subscribers. Each subscriber would get an instance of FluxSink to emit elements (Cold subscribers).

flux create

Lets consider this example to create a sequence using Flux.create.

Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {
    IntStream.range(0, 5)
            .peek(i -> System.out.println("going to emit - " + i))
            .forEach(fluxSink::next);
});

Now lets assume that we have 2 downstream subscribers.

//First observer. takes 1 ms to process each element
integerFlux.delayElements(Duration.ofMillis(1)).subscribe(i -> System.out.println("First :: " + i));

//Second observer. takes 2 ms to process each element
integerFlux.delayElements(Duration.ofMillis(2)).subscribe(i -> System.out.println("Second:: " + i));

If we run the above code, I get the below output.

going to emit - 0
going to emit - 1
going to emit - 2
going to emit - 3
going to emit - 4
going to emit - 0
going to emit - 1
going to emit - 2
going to emit - 3
going to emit - 4
First :: 0
Second:: 0
First :: 1
First :: 2
Second:: 1
First :: 3
First :: 4
Second:: 2
Second:: 3
Second:: 4

What we can understand from the above output is,

  • Each observer gets its own FluxSink instance which is expected as we create a Cold publisher.
  • Create method does not wait for the observer to process the elements. It emits the elements even before observers start processing the elements.

Now, the obvious question could be, What if the observer can not keep up? Create method accepts one more parameter which defines the Overflow strategy & what needs to be done. Default behavior is buffer.

Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {
            IntStream.range(0, 5)
                    .peek(i -> System.out.println("going to emit - " + i))
                    .forEach(fluxSink::next);
        }, FluxSink.OverflowStrategy.DROP);

We could pass either of these values. Check the API here.

We can also get the reference of the FluxSink instance and emit elements outside the create method as and when we need. it does not have to happen inside the create method.

Simple Implementation Of Consumer Of FluxSink :

import reactor.core.publisher.FluxSink;
import java.util.function.Consumer;

public class FluxSinkImpl implements Consumer<FluxSink<Integer>> {

    private FluxSink<Integer> fluxSink;

    @Override
    public void accept(FluxSink<Integer> integerFluxSink) {
        this.fluxSink = integerFluxSink;
    }

    public void publishEvent(int event){
        this.fluxSink.next(event);
    }

}

Emitting Elements:

//create an instance of FluxSink implementation
FluxSinkImpl fluxSinkConsumer = new FluxSinkImpl();

//create method can accept this instance
Flux<Integer> integerFlux = Flux.create(fluxSinkConsumer).delayElements(Duration.ofMillis(1)).share();
integerFlux.delayElements(Duration.ofMillis(1)).subscribe(i -> System.out.println("First :: " + i));
integerFlux.delayElements(Duration.ofMillis(2)).subscribe(i -> System.out.println("Second:: " + i));

//We emit elements here
IntStream.range(0, 5)
        .forEach(fluxSinkConsumer::publishEvent);

Output:

First :: 0
Second:: 0
First :: 1
First :: 2
Second:: 1
First :: 3
First :: 4
Second:: 2
Second:: 3
Second:: 4

Multi-Thread Asynchronous Emitting:

I can also use multiple threads to emit elements via my FluxSink to the downstream subscribers.

Runnable runnable = () -> {
    IntStream.range(0, 5)
            .forEach(fluxSinkConsumer::publishEvent);
};

for (int i = 0; i < 3; i++) {
    new Thread(runnable).start();
}

Flux Generate:

The generate method is slightly different from the create method as shown below. Here it accepts a Consumer of SynchronousSink<T>. In the above create method, we were able to pass a consumer which could emit O …… N elements. But with generate method, we can pass a consumer which could emit only one element!!

 

flux generate

Does it mean this flux can emit only one element at the max?

No. This generate method can also emit potentially infinite number of elements. But what we mean here is that the consumer block can emit only one element along with an optional complete or error call. That is, generate method keeps on emitting elements one-by-one based on the demand from the downstream by invoking the consumer. Consumer itself cannot have loop to emit elements. If observers are not interested in processing further elements, generate would not emit elements.

To understand this behavior, run these codes one by one.

Create behavior

Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {
    System.out.println("Flux create");
    IntStream.range(0, 100)
            .peek(i -> System.out.println("going to emit - " + i))
            .forEach(fluxSink::next);
});
integerFlux.delayElements(Duration.ofMillis(50))
        .subscribe(i -> {
            System.out.println("First consumed ::" + i);
        });
  • In the above Consumer<FluxSink<Integer>> we have a loop which keeps on emitting elements
  • We could see only one ‘Flux create’ message
  • We could see 100 ‘going to emit’ statements
  • Then all the 100 ‘First consumed’ statements one by one

Generate behavior:

AtomicInteger atomicInteger = new AtomicInteger();

//Flux generate sequence
Flux<Integer> integerFlux = Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
   System.out.println("Flux generate");
   synchronousSink.next(atomicInteger.getAndIncrement());
});

//observer
integerFlux.delayElements(Duration.ofMillis(50))
        .subscribe(i -> System.out.println("First consumed ::" + i));
  • We could see 32 ‘Flux generate’ messages
  • We could see 23 ‘First consumed’ messages
  • Again a bunch of ‘Flux generate’ messages
  • Again a bunch of ‘First consumed’ messages

What is going on here?? If this behavior is very confusing, basically this is what happens. Generate method emits elements based on the demand from the downstream. It generates 32 elements first and buffers it. As and when downstream starts processing elements and when the buffer size drops the below threshold, it emits few more elements. This process is repeated again and again. At one point, if the observer stops processing elements, Flux.generate will also stop emitting elements.  So this generate method is aware of downstream observers processing speed.

Lets try this code. Here we try to emit 2 elements at a time.

AtomicInteger atomicInteger = new AtomicInteger();
Flux<Integer> integerFlux = Flux.generate((SynchronousSink<Integer> synchronousSink) -> {
   System.out.println("Flux generate");
    synchronousSink.next(atomicInteger.getAndIncrement());
    synchronousSink.next(atomicInteger.getAndIncrement());
});

The output is as shown here! It is illegal to emit more than one element using a SynchronousSink.

Flux generate
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: More than one call to onNext
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: More than one call to onNext
Caused by: java.lang.IllegalStateException: More than one call to onNext

As we can not emit more than one element, It does not make sense to get the reference of SynchronousSink as we did for FluxSink above to programmatically emit elements.

Generate method can also maintain state if required.

//To supply an initial state
Callable<Integer> initialState = () -> 65;

//BiFunction to consume the state, emit value, change state
BiFunction<Integer, SynchronousSink<Character>, Integer> generator = (state, sink) -> {
    char value = (char) state.intValue();
    sink.next(value);
    if (value == 'Z') {
        sink.complete();
    }
    return state + 1;
};

//Flux which accepts initialstate and bifunction as arg
Flux<Character> charFlux = Flux.generate(initialState, generator);

//Observer
charFlux.delayElements(Duration.ofMillis(50))
      .subscribe(i -> System.out.println("Consumed ::" + i));

The above code produced below output.

Consumed ::A
Consumed ::B
Consumed ::C
Consumed ::D
Consumed ::E
Consumed ::F
Consumed ::G
Consumed ::H
Consumed ::I
Consumed ::J
Consumed ::K
Consumed ::L
Consumed ::M
Consumed ::N
Consumed ::O
Consumed ::P
Consumed ::Q
Consumed ::R
Consumed ::S
Consumed ::T
Consumed ::U
Consumed ::V
Consumed ::W
Consumed ::X
Consumed ::Y
Consumed ::Z

Flux Create vs Generate:

Flux Create Flux Generate
Accepts a Consumer<FluxSink<T>> Accepts a Consumer<SynchronousSink<T>>
Consumer is invoked only once Consumer is invoked again and again based on the downstream demand
Consumer can emit 0..N elements immediately Consumer can emit only one element
Publisher is not aware of downstream state. So we need to provide Overflow strategy as an additional parameter Publisher produces elements based on the downstream demand
We can get the reference of FluxSink using which we could keep on emitting elements using multiple threads if required We can get the reference of SynchronousSink. But it might not be really useful as we could emit only one element

Summary:

Hopefully this article gave you an idea about the difference between Flux Create vs Generate in Project Reactor.

Read more about Project Reactor / Java Reactive Programming.

Happy learning 🙂

 

 

Share This:

3 thoughts on “Reactor Flux Create vs Generate

  1. How to emit the flux to the web browser on real time data update (I am using onEventListener of firestore db)? I get the flux with real time data update in my console, using .subscribe, but unable to emit that as response to the browser/postman. Please help.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.