reactor hot publisher

Reactor limitRate Example

Overview:

In this tutorial, Lets see how we could use Reactor limitRate to limit the rate of the incoming or outgoing messages through our reactive channel. If you are new to project reactor, take a look at all these articles here first for better understanding.

Reactor limitRate:

In reactive programming, we have subscribers (downstream) and publishers (upstream).  Sometimes, the downstream services might request for infinite amount of data to the source (upstream). It could affect the performance of the source (lets say the source is a DB). In this case, It would be better to have some operator in between the source and the downstream subscriber to understand the demand from the downstream and pre-fetch the data based on the downstream processing speed instead of fetching all the data at once.

This is where Reactor limitRate helps!

Subscription Request:

Lets take a took at this example. Here the subscriber is requesting for 1000 elements at once. Here we just print the value as and when the source emits. But if we do a time consuming operation, having the upstream to produce 1000 elements and keeping all them in the memory, slowly processing them one by one is going to waste all the resources. So we need a way to limit the requests.

Flux.range(1, 100)
        .log()
        .subscribe(
                System.out::println,  // subscribe
                System.out::println,  // error handler
                () -> {},             // onComplete
                s -> s.request(1000) // subscription request
         );

Output:

16:14:55.472 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
16:14:55.475 [main] INFO reactor.Flux.Range.1 - | request(1000)
16:14:55.475 [main] INFO reactor.Flux.Range.1 - | onNext(1)
...
...

Default Behavior:

Lets first understand the default behavior of the reactor.

Flux.range(1, 100)
        .log()
        .delayElements(Duration.ofMillis(100))
        .subscribe(System.out::println);

Output:

  • I have removed the excessive logs and replaced the sequence with ‘…’ .
  • If you see by default, upstream receives a request for 32 elements. 32 elements are produced and published to downstream.
  • once the 24 elements are drained for the downstream(log might show 23. but it should be 24. this is because, the 24th element is being delayed by the delayElement operator), upstream receives another request for 24 elements.
  • This gets repeated until the upstream sends a complete/error signal.
15:12:30.510 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
15:12:30.516 [main] INFO reactor.Flux.Range.1 - | request(32)
15:12:30.517 [main] INFO reactor.Flux.Range.1 - | onNext(1)
15:12:30.583 [main] INFO reactor.Flux.Range.1 - | onNext(2)
...
...
15:12:30.584 [main] INFO reactor.Flux.Range.1 - | onNext(32)
1
2
...
23
15:12:32.884 [parallel-3] INFO reactor.Flux.Range.1 - | request(24)
15:12:32.885 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(33)
...
15:12:32.886 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(56)
24
...
47
15:12:35.293 [parallel-3] INFO reactor.Flux.Range.1 - | request(24)
15:12:35.293 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(57)
...
15:12:35.294 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(80)
48
...
71
15:12:37.701 [parallel-3] INFO reactor.Flux.Range.1 - | request(24)
15:12:37.701 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(81)
...
15:12:37.703 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(99)
15:12:37.703 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(100)
15:12:37.704 [parallel-3] INFO reactor.Flux.Range.1 - | onComplete()
72
...
95
15:12:40.111 [parallel-3] INFO reactor.Flux.Range.1 - | request(24)
96
97
98
99
100

Limit Rate:

Reactor limitRate can control this behavior by sending specific demand request to the upstream. In this case, we request only for 10.

Flux.range(1, 100)
        .log()
        .limitRate(10)
        .delayElements(Duration.ofMillis(100))
        .subscribe(System.out::println);

 

reactor limitrate

Output:

15:18:11.012 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
15:18:11.017 [main] INFO reactor.Flux.Range.1 - | request(10)
15:18:11.021 [main] INFO reactor.Flux.Range.1 - | onNext(1)
...
15:18:11.079 [main] INFO reactor.Flux.Range.1 - | onNext(10)
1
2
3
4
5
6
7
15:18:11.778 [parallel-3] INFO reactor.Flux.Range.1 - | request(8)
15:18:11.778 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(11)
...
...
...
...
15:18:19.005 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(90)
80
...
87
15:18:19.808 [parallel-3] INFO reactor.Flux.Range.1 - | request(8)
15:18:19.808 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(91)
...
15:18:19.808 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(98)
88
...
95
15:18:20.610 [parallel-3] INFO reactor.Flux.Range.1 - | request(8)
15:18:20.611 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(99)
15:18:20.611 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(100)
15:18:20.612 [parallel-3] INFO reactor.Flux.Range.1 - | onComplete()
96
97
98
99
100

Once the 75% of data got drained/emitted, then it automatically requests to refill the amount. That’s why we see a request for 10 first. After that we see demand for 8 every time after that.

High Tide / Low tide:

High tide is simply the initial prefetch amount. The subsequent refill amount can be adjusted by using low tide as shown here. Low tide will make frequent refill request to the upstream.

Flux.range(1, 100)
        .log()
        .limitRate(10, 2)
        .delayElements(Duration.ofMillis(100))
        .subscribe(System.out::println);

Output:

17:48:20.077 [main] INFO reactor.Flux.Range.1 - | request(10)
17:48:20.079 [main] INFO reactor.Flux.Range.1 - | onNext(1)
...
17:48:20.124 [main] INFO reactor.Flux.Range.1 - | onNext(10)
1
17:48:20.223 [parallel-1] INFO reactor.Flux.Range.1 - | request(2)
17:48:20.225 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(11)
17:48:20.225 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(12)
2
3
17:48:20.426 [parallel-3] INFO reactor.Flux.Range.1 - | request(2)
17:48:20.426 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(13)
17:48:20.426 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(14)
4
5
17:48:20.627 [parallel-1] INFO reactor.Flux.Range.1 - | request(2)
...
...
...
98
99
17:48:30.065 [parallel-3] INFO reactor.Flux.Range.1 - | request(2)
100

Low Tide = High Tide:

When high tide = low tide, it reverts the replenish strategy to the default of 75%.

Low tide = 0:

When low tide = 0, it fully disables the early request to the upstream. Instead, all the items are fully drained before requesting for refill.

Flux.range(1, 100)
        .log()
        .limitRate(10, 0)
        .delayElements(Duration.ofMillis(100))
        .subscribe(System.out::println);

Output:

17:45:45.888 [main] INFO reactor.Flux.Range.1 - | request(10)
17:45:45.889 [main] INFO reactor.Flux.Range.1 - | onNext(1)
...
17:45:45.944 [main] INFO reactor.Flux.Range.1 - | onNext(10)
1
..
9
17:45:46.837 [parallel-1] INFO reactor.Flux.Range.1 - | request(10)
17:45:46.838 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(11)
...
17:45:46.838 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(20)
10
...
19
17:45:47.841 [parallel-3] INFO reactor.Flux.Range.1 - | request(10)
17:45:47.841 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(21)
...
17:45:47.842 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(29)
17:45:47.842 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(30)
20
...
29
17:45:48.844 [parallel-1] INFO reactor.Flux.Range.1 - | request(10)
...
...
17:45:54.866 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(99)
17:45:54.866 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(100)
17:45:54.868 [parallel-1] INFO reactor.Flux.Range.1 - | onComplete()
90
...
99
17:45:55.871 [parallel-3] INFO reactor.Flux.Range.1 - | request(10)
100

Summary:

We were able to successfully demonstrate how we could do the rate limiting to control the downstream requests demand & not to put stress on the source with Reactor limitRate. This will be helpful for the sources like DB to prefetch only specific amount of records instead of fetching all the records.

Read more about Reactor / Reactive Programming.

Happy learning 🙂

Share This:

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.