RSocket Client Responders

Overview:

In this tutorial, I would like to show you RSocket Client Responders through which a Server can callback the client for any requests/updates.

If you are new to RSocket, take a look at these articles on RSocket.

RSocket Client Responders:

RSocket is a binary message passing protocol for client server application development. It supports Reactive Streams & establishes a persistent connection between the client and the server. Once connected, any component can act like a client or server. For example, client will connect with server. A server can callback client for requesting some additional information.

Lets see how we can achieve this using RSocket Client Responders.

Sample Application:

Lets consider a sample application in which there is a client-app (A) which submits a task to the server (B) to complete. The server receives the request and providers a request id to the client as part of acknowledgement.

rsocket client responders

The server calls back the client at some time in future when it has some updates.Imagine this as your local Pizza place. You call the store and place your order. The food is delivered later to you at your door! Here the server which received the request acts like a client. It can request for information from app A or it can share some update with app A.

Project Setup:

Create a Spring Boot application with RSocket dependency.

I create a multi module maven project as shown here.

RSocket Server Side:

  • Request controller – to receive the incoming request and provide a request # as part of acknowledgement.
    • As part of @MessageMapping, Spring Boot can autowire the requester connection which can be used later for callback.
    • In this example, this is simple request and response model.
    • Imagine this as a receptionist who makes a note of your info. When your turn comes later, you will be notified.
@Controller
public class RequestController {

    @Autowired
    private RequestProcessor requestProcessor;

    @MessageMapping("place-request")
    public Mono<UUID> task(String request, RSocketRequester rSocketRequester){
        UUID uuid = UUID.randomUUID();
        this.requestProcessor.processRequests(rSocketRequester, uuid);
        return Mono.just(uuid);
    }

}
  • Request processor – this is asynchronous request processor & updates the client later via callback endpoint.
@Service
public class RequestProcessor {

    public void processRequests(RSocketRequester rSocketRequester, UUID uuid){
        Mono.just("Your request " + uuid + "  is completed")
                .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(5, 10)))
                .flatMap(m -> rSocketRequester.route("request-status-callback").data(m).send())
                .subscribe();
    }

}

RSocket Client Side:

  • Callback controller
@Controller
public class CallbackController {

    // server will callback for any update
    @MessageMapping("request-status-callback")
    public void message(String message){
        System.out.println(LocalDateTime.now() + " :: " + message);
    }

}
  • Configuration
    • This is important for callback
@Configuration
public class RSocketConfig {


    @Bean
    public RSocketRequester getRSocketRequester(RSocketRequester.Builder builder, RSocketMessageHandler handler){
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector.acceptor(handler.responder()))
                .tcp("localhost", 6565);
    }
    
}
  • Sending request
@SpringBootApplication
public class RSocketClientApplication implements CommandLineRunner {

    @Autowired
    private RSocketRequester rSocketRequester;

    public static void main(String[] args) {
        SpringApplication.run(RSocketClientApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        rSocketRequester.route("place-request") // message endpoint
                .data("some request") // data
                .retrieveMono(UUID.class) // request #
                .subscribe(s -> System.out.println(LocalDateTime.now() + " :: Request # " + s));

    }
}

Demo:

If I run the server and client applications, client-app sends a request and receives a callback as shown below.

2020-12-27T20:50:47.443969 :: Request # ba8f463a-668d-49e4-b48b-c042905d8575
2020-12-27T20:50:56.439707 :: Your request ba8f463a-668d-49e4-b48b-c042905d8575  is completed

Lets send requests in a loop as shown here.

for (int i = 0; i < 5; i++) {
    rSocketRequester.route("place-request") // message endpoint
            .data("some request") // data
            .retrieveMono(UUID.class) // request #
            .subscribe(s -> System.out.println(LocalDateTime.now() + " :: Request # " + s));
}

Output:

We submit 5 requests. The server provides the acknowledgement id immediately. The request completion time is random and it provides the update as soon as they are completed.

2020-12-27T20:54:22.096022 :: Request # 5de77e25-f020-411a-88dc-b203fa8e3154
2020-12-27T20:54:22.100045 :: Request # 31c1d3db-73d2-45d7-bce3-19120a21ce6f
2020-12-27T20:54:22.100249 :: Request # ccca359b-9306-4be3-a232-2240d987f79f
2020-12-27T20:54:22.100463 :: Request # db30a31a-c09d-4485-9475-351690c3241c
2020-12-27T20:54:22.100644 :: Request # 8a156217-3d7d-4764-a37a-bf500ea005df
2020-12-27T20:54:27.084350 :: Your request 31c1d3db-73d2-45d7-bce3-19120a21ce6f  is completed
2020-12-27T20:54:28.079920 :: Your request 8a156217-3d7d-4764-a37a-bf500ea005df  is completed
2020-12-27T20:54:30.067769 :: Your request 5de77e25-f020-411a-88dc-b203fa8e3154  is completed
2020-12-27T20:54:30.069056 :: Your request ccca359b-9306-4be3-a232-2240d987f79f  is completed
2020-12-27T20:54:30.070123 :: Your request db30a31a-c09d-4485-9475-351690c3241c  is completed

Summary:

We were able to successfully demonstrate RSocket Client Callback / RSocket Client Responders.

Read more about RSocket.

The source code is available here.

Happy learning 🙂

Share This:

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.