RSocket Load Balancing – Client Side

Overview:

In this tutorial, I would like to demo Spring RSocket Load Balancing on the client side.

If you are new to RSocket, take a look at these articles on RSocket.

RSocket Load Balancing:

RSocket is a binary message passing protocol for client server application development which supports Reactive Streams. It establishes a persistent connection between the client and the server (like gRPC). It is a good choice for inter-Microservices communication. However there is a challenge in load balancing because of the persistent connection.

Lets see how we can achieve RSocket Load Balancing when we have a pool of servers.

High Level Setup:

Lets consider a simple client-server application in which we have 3 instances of the server-app are running. The client wants to send multiple requests and distribute the load across all the server instances. It gets the server instance details periodically from a service-registry (like consul/eureka..etc) and sends the requests accordingly.

rsocket load balancing

Project Setup:

Create a Spring Boot application with RSocket & Lombok dependency.

I create a multi-module maven project as shown here.

Server Side:

  • The server-app exposes an endpoint as shown here. It is simple and nothing else is required for this demo.
@Controller
public class ServerController {

    @MessageMapping("square-calculator")
    public Mono<Integer> square(Mono<Integer> input){
        return input
                .doOnNext(i -> System.out.println("Received : " + i))
                .delayElement(Duration.ofMillis(500))
                .map(i -> i * i);
    }

}
  • Run this command – do a package.
mvn clean package
  • Run 3 instances of the app as shown here (run this command from the server-app/target ).
java -jar -Dspring.rsocket.server.port=6565 server-app-0.0.1-SNAPSHOT.jar
java -jar -Dspring.rsocket.server.port=6566 server-app-0.0.1-SNAPSHOT.jar
java -jar -Dspring.rsocket.server.port=6567 server-app-0.0.1-SNAPSHOT.jar

Client Side:

  • I assume the client will be provided with the list of server instance details somehow (in real life you can get this from a service registry). For this demo I use this application.yaml.
rsocket:
  square-service:
      servers:
        - host: localhost
          port: 6565
        - host: localhost
          port: 6566
        - host: localhost
          port: 6567
  • I create a simple model class to get the above details as shown here.
@Data
@ToString
public class RSocketServerInstance {
    private String host;
    private int port;
}
  • Service Registry
    • Here I assume you use something like consul/eureka from where you can fetch the list of dynamic IP addresses of server instances for the given service name.
    • In our case, we will be running 3 instances of the server-app and every time I am going to exclude 1 just to simulate that instance is not healthy!
@Service
@ConfigurationProperties(prefix = "rsocket.square-service")
public class DummyServiceRegistry {

    private List<RSocketServerInstance> servers;
    private AtomicInteger atomicInteger = new AtomicInteger(0);

    public void setServers(List<RSocketServerInstance> servers) {
        this.servers = servers;
    }

    // we exclude 1 instance every time to simulate something is not available
    public List<RSocketServerInstance> getServers() {
        atomicInteger.getAndIncrement();
        return IntStream.rangeClosed(0, 2)
                        .filter(i -> atomicInteger.get() % 3 != i)
                        .mapToObj(servers::get)
                        .collect(Collectors.toList());
    }

}
  • Load Balance Target
    • Above class is simulating a service registry.
    • Once we get the IP addresses of the healthy instances and create List<LoadbalanceTarget>
    • We periodically query the service registry & make it as a Flux<List<LoadbalanceTarget>> – a never ending stream.
@Configuration
public class LoadBalanceTargetConfig {

    @Autowired
    private DummyServiceRegistry serviceRegistry;

    @Bean
    public Flux<List<LoadbalanceTarget>> targets(){
        return Mono.fromSupplier(() -> serviceRegistry.getServers())
                .repeatWhen(longFlux -> longFlux.delayElements(Duration.ofSeconds(2)))
                .map(this::toLoadBalanceTarget);
    }

    private List<LoadbalanceTarget> toLoadBalanceTarget(List<RSocketServerInstance> rSocketServers){
        return rSocketServers.stream()
                .map(server -> LoadbalanceTarget.from(server.getHost() + server.getPort(), TcpClientTransport.create(server.getHost(), server.getPort())))
                .collect(Collectors.toList());
    }

}
  • RSocket Configuration
    • Once we have the Flux<List<LoadbalanceTarget>>, we build the RSocketRequester with that flux as shown here.
@Configuration
public class RSocketConfig {

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .build();
    }

    @Bean
    public RSocketRequester rSocketClient(RSocketRequester.Builder builder, Flux<List<LoadbalanceTarget>> targetFlux){
       return builder.transports(targetFlux, new RoundRobinLoadbalanceStrategy());
    }

}
  • That’s it! Now the RSocketRequester is ready to be used and balance the load across the given instances.
@SpringBootApplication
public class RSocketClientApplication implements CommandLineRunner {

    @Autowired
    private RSocketRequester rSocketRequester;

    public static void main(String[] args) {
        SpringApplication.run(RSocketClientApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        Flux.range(1, 10000)
            .delayElements(Duration.ofMillis(100))
            .flatMap(i -> rSocketRequester.route("square-calculator").data(i).retrieveMono(Integer.class).retry(1))
            .doOnNext(i -> System.out.println("Response : " + i))
            .blockLast();
    }
}

When we run the client side application, we can see that RSocket Load Balancing happens across all the server instances.

Summary:

We were able to successfully demonstrate RSocket Load Balancing from Client Side. Special thanks to RSocket maintainer Oleg for clarifying my issues on this topic!

Read more about RSocket.

The source code is available here.

Happy learning 🙂

 

Share This:

2 thoughts on “RSocket Load Balancing – Client Side

  1. Thx for article! I have a question. What to do in the case of RSocket, if we are talking about Kubernetis. We do not know in advance the number of instances and their location, except for a single host for everyone. We got only some host like tcp://some-service that hides an unknown number of pods behind it

    1. In kubernetes, we can get the list of pod ips behind the service proxy. However I am not saying that this is what we should do. This is when we could use Consul/eureka like service registry.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.