Site icon Vinsguru

Reactor Repeat vs Retry

reactor hot publisher

Overview:

In the reactive programming series with project reactor, I would like to show you the difference between Reactor Repeat vs Reactor Retry.

If you are new to reactive programming or project reactor, take a look at this entire series to get a good idea on that.

Need For Reactor Repeat & Retry:

As we know already, reactive programming is a style of programming which observes on the data streams, reacting to the changes and propagating them! The data stream will be closed when there is no more data for source to emit or when there is an unhandled exception!

Sometimes, we might want not to stop the process and we might have to keep that going forever!

Reactor Method Usage
repeat re-start the reactive pipeline / re-subscribes to the source when the source sends the complete signal.
retry re-start the reactive pipeline / re-subscribes to the source when there is an error signal.

Reactor Retry:

As we just saw above, Reactor retry resubscribes to the source if we get any error signal. For example, assume that one microservice tries to send a request to another microservice and expect a response. Due to some network issue, we get the request timed out and it throws an exception. To make the system more resilient, reactor can absorb this error and retry the requests.

To play with this retry feature, Let’s take a very simple example. Here we have our source which emits number from 1 to 10. Unfortunately our pipeline can not process numbers beyond 5. It abruptly stops when we get > 5.

Flux.range(1, 10)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .map(i -> {
        if(i > 5)
            throw new RuntimeException("Can not process > 5");
        return i;
    })
    .subscribe(i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err));

Output:

Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
Emitted :: 4
Received :: 4
Emitted :: 5
Received :: 5
Emitted :: 6
Error :: java.lang.RuntimeException: Can not process > 5
Flux.range(1, 10)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .map(i -> {
        if(i > 5)
            throw new RuntimeException("Can not process > 5");
        return i;
    })
    .retry(1)
    .subscribe(i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err));

When we add the retry(1) and get the error signal, we resubscribe to the source. Our source tries to emit 1 to 10 once again for every subscription.  After 1 retry attempt, then the error is thrown as usual.

Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
Emitted :: 4
Received :: 4
Emitted :: 5
Received :: 5
Emitted :: 6  // will retry now
Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
Emitted :: 4
Received :: 4
Emitted :: 5
Received :: 5
Emitted :: 6
Error :: java.lang.RuntimeException: Can not process > 5
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(3)))
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
.retryWhen(Retry.indefinitely())
Flux.range(1, 10)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .map(i -> {
        if(i > 0)
            throw new RuntimeException("Can not process > 0");
        return i;
    })
    .retryWhen(Retry.maxInARow(5))
    .subscribe(i -> System.out.println("Received :: " + i),
            err -> System.out.println("Error :: " + err));

When the flux emits 1, we get the error and we retry. The flux emits 1 again because of the new subscription. This will throw error once again. It will repeat!  We allow up to 5 attempts and then we stop.

Emitted :: 1
Emitted :: 1
Emitted :: 1
Emitted :: 1
Emitted :: 1
Emitted :: 1
Error :: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 5/5 in a row (5 total)
// some source
DirectProcessor<Object> processor = DirectProcessor.create();

// flux pipeline
Flux.range(1, 10)
        .doOnNext(i -> System.out.println("Emitted :: " + i))
        .map(i -> {
            if(i > 0)
                throw new RuntimeException("Can not process > 5");
            return i;
        })
        .retryWhen(Retry.from(s -> processor)) // companion source
        .subscribe(i -> System.out.println("Received :: " + i),
                err -> System.out.println("Error :: " + err));

// somewhere else
FluxSink<Object> sink = processor.sink();
for (int i = 0; i < 3; i++) {
    sleep(Duration.ofSeconds(3));
    System.out.println("Sink emission :: " + i);
    sink.next(i);  // every emission here will make the flux above emit the element
}

Output:

Emitted :: 1
Sink emission :: 0
Emitted :: 1
Sink emission :: 1
Emitted :: 1
Sink emission :: 2
Emitted :: 1

Reactor Repeat:

Repeat is kind of retry – but it gets triggered when the source sends the complete signal.

Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .subscribe(i -> System.out.println("Received :: " + i));

This code produces below output and pipeline is closed as there is no more data to process.

Output:

Emitted :: 1
Received :: 1
Emitted :: 2
Received :: 2
Emitted :: 3
Received :: 3
Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .repeat(1)
    .subscribe(i -> System.out.println("Received :: " + i));
Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .repeat(() -> true)
    .subscribe(i -> System.out.println("Received :: " + i));
// some source
DirectProcessor<Object> processor = DirectProcessor.create();

// flux with companion repeat
Flux.range(1, 3)
        .doOnNext(i -> System.out.println("Emitted :: " + i))
        .repeatWhen(i -> processor)
        .subscribe(i -> System.out.println("Received :: " + i));

// random sleep
sleep(Duration.ofSeconds(4));

FluxSink<Object> sink = processor.sink();
sink.next(1); // this will make the above flux repeat

The companion source does not have to be an external source. We can also decide whether to continue or not by checking the last emitted item.

Flux.range(1, 3)
    .doOnNext(i -> System.out.println("Emitted :: " + i))
    .repeatWhen(flux -> flux.handle((lastEmitted, sink) -> {
        if(lastEmitted > 3)
            sink.complete();
        else
            sink.next(1);
    }))
    .subscribe(i -> System.out.println("Received :: " + i));

Summary:

We were able to successfully demonstrate the difference between Reactor repeat vs Reactor retry and when to use what.

Learn more about Reactor / Java reactive programming.

Happy learning 🙂

Share This:

Exit mobile version