Site icon Vinsguru

RSocket Load Balancing – Client Side

Overview:

In this tutorial, I would like to demo Spring RSocket Load Balancing on the client side.

If you are new to RSocket, take a look at these articles on RSocket.

RSocket Load Balancing:

RSocket is a binary message passing protocol for client server application development which supports Reactive Streams. It establishes a persistent connection between the client and the server (like gRPC). It is a good choice for inter-Microservices communication. However there is a challenge in load balancing because of the persistent connection.

Lets see how we can achieve RSocket Load Balancing when we have a pool of servers.

High Level Setup:

Lets consider a simple client-server application in which we have 3 instances of the server-app are running. The client wants to send multiple requests and distribute the load across all the server instances. It gets the server instance details periodically from a service-registry (like consul/eureka..etc) and sends the requests accordingly.

Project Setup:

Create a Spring Boot application with RSocket & Lombok dependency.

I create a multi-module maven project as shown here.

Server Side:

@Controller
public class ServerController {

    @MessageMapping("square-calculator")
    public Mono<Integer> square(Mono<Integer> input){
        return input
                .doOnNext(i -> System.out.println("Received : " + i))
                .delayElement(Duration.ofMillis(500))
                .map(i -> i * i);
    }

}
mvn clean package
java -jar -Dspring.rsocket.server.port=6565 server-app-0.0.1-SNAPSHOT.jar
java -jar -Dspring.rsocket.server.port=6566 server-app-0.0.1-SNAPSHOT.jar
java -jar -Dspring.rsocket.server.port=6567 server-app-0.0.1-SNAPSHOT.jar

Client Side:

rsocket:
  square-service:
      servers:
        - host: localhost
          port: 6565
        - host: localhost
          port: 6566
        - host: localhost
          port: 6567
@Data
@ToString
public class RSocketServerInstance {
    private String host;
    private int port;
}
@Service
@ConfigurationProperties(prefix = "rsocket.square-service")
public class DummyServiceRegistry {

    private List<RSocketServerInstance> servers;
    private AtomicInteger atomicInteger = new AtomicInteger(0);

    public void setServers(List<RSocketServerInstance> servers) {
        this.servers = servers;
    }

    // we exclude 1 instance every time to simulate something is not available
    public List<RSocketServerInstance> getServers() {
        atomicInteger.getAndIncrement();
        return IntStream.rangeClosed(0, 2)
                        .filter(i -> atomicInteger.get() % 3 != i)
                        .mapToObj(servers::get)
                        .collect(Collectors.toList());
    }

}
@Configuration
public class LoadBalanceTargetConfig {

    @Autowired
    private DummyServiceRegistry serviceRegistry;

    @Bean
    public Flux<List<LoadbalanceTarget>> targets(){
        return Mono.fromSupplier(() -> serviceRegistry.getServers())
                .repeatWhen(longFlux -> longFlux.delayElements(Duration.ofSeconds(2)))
                .map(this::toLoadBalanceTarget);
    }

    private List<LoadbalanceTarget> toLoadBalanceTarget(List<RSocketServerInstance> rSocketServers){
        return rSocketServers.stream()
                .map(server -> LoadbalanceTarget.from(server.getHost() + server.getPort(), TcpClientTransport.create(server.getHost(), server.getPort())))
                .collect(Collectors.toList());
    }

}
@Configuration
public class RSocketConfig {

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .build();
    }

    @Bean
    public RSocketRequester rSocketClient(RSocketRequester.Builder builder, Flux<List<LoadbalanceTarget>> targetFlux){
       return builder.transports(targetFlux, new RoundRobinLoadbalanceStrategy());
    }

}
@SpringBootApplication
public class RSocketClientApplication implements CommandLineRunner {

    @Autowired
    private RSocketRequester rSocketRequester;

    public static void main(String[] args) {
        SpringApplication.run(RSocketClientApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        Flux.range(1, 10000)
            .delayElements(Duration.ofMillis(100))
            .flatMap(i -> rSocketRequester.route("square-calculator").data(i).retrieveMono(Integer.class).retry(1))
            .doOnNext(i -> System.out.println("Response : " + i))
            .blockLast();
    }
}

When we run the client side application, we can see that RSocket Load Balancing happens across all the server instances.

Summary:

We were able to successfully demonstrate RSocket Load Balancing from Client Side. Special thanks to RSocket maintainer Oleg for clarifying my issues on this topic!

Read more about RSocket.

The source code is available here.

Happy learning 🙂

 

Share This:

Exit mobile version