RSocket With Spring Boot

Overview:

In this tutorial, I would like to show you how to integrate RSocket with Spring Boot. If you are new to RSocket, take a look at this article to learn more.

RSocket Interaction Models:

RSocket is a message passing protocol for multiplexed, duplex communication which supports TCP, WebSockets and Aeron (UDP). It supports following interaction models:

Interaction Model Behavior / Usage
fire-and-forget No response required
request-and-response Unary / Traditional request and response model
one-to-one
request-response-stream Send one message and gets multiple streaming response back
one-to-many
channel bi-directional stream. Continuous message exchange
many-to-many

Sample Application:

Lets consider a simple movie theater application to demo RSocket & its interaction models .

  • A ticket purchase request would be sent. The application would respond back with ISSUED status.
    • request-and-response

 

  • A ticket cancel request would be sent. The application would process the refund asynchronously.
    • fire-and-forget

 

  • A play movie request would be sent with a valid ticket. The application will respond back with list of scenes to be played
    • request-response-stream

 

We will have a separate controller class for TV.

  • A play movie request would be sent with the list of scenes to be played. The application will respond with corresponding scenes.
    • channel / bi-directional stream

RSocket With Spring Boot – Project Setup:

Lets create a Spring Boot project with these dependencies.

rsocket spring boot

Models/DTOs:

We will have following models.

  • TicketStatus
public enum TicketStatus {

    TICKET_PENDING,
    TICKET_ISSUED,
    TICKET_CANCELLED;

}
  • TicketRequest
@Data
@NoArgsConstructor
public class TicketRequest {

    private UUID requestId;
    private TicketStatus status = TicketStatus.TICKET_PENDING;

    public TicketRequest(UUID requestId) {
        this.requestId = requestId;
    }
}
  • MovieScene
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MovieScene {

    private int sceneId;
    private String sceneDescription;

}

Movie Service:

We have below service class which has list of scenes to be played for a movie.

@Service
public class MovieService {

    private final List<MovieScene> scenes =  List.of(
            new MovieScene(1, "Scene 1"),
            new MovieScene(2, "Scene 2"),
            new MovieScene(3, "Scene 3"),
            new MovieScene(4, "Scene 4"),
            new MovieScene(5, "Scene 5")
    );

    public List<MovieScene> getScenes(){
        return this.scenes;
    }

    public MovieScene getScene(int index){
        return this.scenes.get(index);
    }

}

RSocket With Spring Boot – Server Side:

RSocket is a message passing protocol. Spring Boot does all the heavy lifting by simply letting us annotate with message handler names as shown here.

  • fire-and-forget
    • Here we assume that cancel request would be sent to the “ticket.cancel” handler. The name can be anything. [supports alpha numeric and .]
    • Note that we do not return anything.
@MessageMapping("ticket.cancel")
public void cancelTicket(Mono<TicketRequest> request){
    // cancel and refund asynchronously
    request
            .doOnNext(t -> t.setStatus(TicketStatus.TICKET_CANCELLED))
            .doOnNext(t -> System.out.println("cancelTicket :: " + t.getRequestId() + " : " + t.getStatus()))
            .subscribe();
}
  • request-response
    • We change the incoming ticket status from pending to ISSUED and respond.
@MessageMapping("ticket.purchase")
public Mono<TicketRequest> purchaseTicket(Mono<TicketRequest> request){
    return request
            .doOnNext(t -> t.setStatus(TicketStatus.TICKET_ISSUED))
            .doOnNext(t -> System.out.println("purchaseTicket :: " + t.getRequestId() + " : " + t.getStatus()));

}
  • request-response-stream
    • We first check if the request has a valid ticket issued
    • Then we stream the movie scenes to be played
@MessageMapping("movie.stream")
public Flux<MovieScene> playMovie(Mono<TicketRequest> request){
    return request
            .map(t -> t.getStatus().equals(TicketStatus.TICKET_ISSUED) ? this.movieService.getScenes() : Collections.emptyList())
            .flatMapIterable(Function.identity())
            .cast(MovieScene.class)
            .delayElements(Duration.ofSeconds(1));
}
  • bi-directional stream
    • Here we assume the users uses his TV. He randomly chooses the scenes to be played.
    • We stream the corresponding scene back to the user.
@MessageMapping("tv.movie")
public Flux<MovieScene> playMovie(Flux<Integer> sceneIndex){
    return sceneIndex
            .map(index -> index - 1) // list is 0 based index
            .map(this.movieService::getScene)
            .delayElements(Duration.ofSeconds(1));
}
  • Spring boot automatically starts the rsocket server based on this application property.
spring.rsocket.server.port=6565

RSocket With Spring Boot – Client Side:

  • RSocket Configuration
@Configuration
public class RSocketConfig {

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .build();
    }

    @Bean
    public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
                .dataMimeType(MediaType.APPLICATION_CBOR)
                .connect(TcpClientTransport.create(6565));
    }

}
  • RSocketRequester bean is the one to be autowired /  to be used by the client.
  • RSocketRequester has route method which is used to send the request to the specific handler we saw above.
  • Check this below – cancel ticket test
    • We send a TicketRequest to the “ticket.cancel” handler
@Test
public void ticketCancel(){
    Mono<Void> mono = this.rSocketRequester
            .map(r -> r.route("ticket.cancel").data(new TicketRequest(UUID.randomUUID())))
            .flatMap(RSocketRequester.RetrieveSpec::send);

    StepVerifier.create(mono)
            .verifyComplete();
}
  • ticket purchase request test
@Autowired
private Mono<RSocketRequester> rSocketRequester;

@Test
public void ticketPurchase(){
    Mono<TicketRequest> ticketRequestMono = this.rSocketRequester
            .map(r -> r.route("ticket.purchase").data(new TicketRequest(UUID.randomUUID())))
            .flatMap(r -> r.retrieveMono(TicketRequest.class))
            .doOnNext(r -> System.out.println(r.getRequestId() + ":" + r.getStatus()));
    StepVerifier.create(ticketRequestMono)
            .expectNextMatches(t -> t.getStatus().equals(TicketStatus.TICKET_ISSUED))
            .verifyComplete();
}
  • play movie – request-response stream –  test
@Test
public void playMovie(){
    Mono<TicketRequest> ticketRequestMono = this.rSocketRequester
            .map(r -> r.route("ticket.purchase").data(new TicketRequest(UUID.randomUUID())))
            .flatMap(r -> r.retrieveMono(TicketRequest.class));

    Flux<MovieScene> sceneFlux = this.rSocketRequester
            .zipWith(ticketRequestMono)
            .map(tuple -> tuple.getT1().route("movie.stream").data(tuple.getT2()))
            .flatMapMany(r -> r.retrieveFlux(MovieScene.class))
            .doOnNext(m -> System.out.println("Playing : " + m.getSceneDescription()));

    // assert all the movie scenes
    StepVerifier.create(sceneFlux)
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 4"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 5"))
            .verifyComplete();
}
  • tv movie play – bi-directional stream
@Test
public void tvPlayMovie(){
    Flux<Integer> movieSceneFlux = Flux.just(1, 2, 2, 1, 2, 3, 3, 4, 5);
    Flux<MovieScene> tvFlux = this.rSocketRequester
            .map(r -> r.route("tv.movie").data(movieSceneFlux))
            .flatMapMany(r -> r.retrieveFlux(MovieScene.class))
            .doOnNext(m -> System.out.println("TV : " + m.getSceneDescription()));
    StepVerifier.create(tvFlux)
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 1"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 2"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 3"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 4"))
            .expectNextMatches(m -> m.getSceneDescription().equals("Scene 5"))
            .verifyComplete();
}

Summary:

We were able to successfully integrate RSocket with Spring Boot. Passing messages between client and server is very easy with Spring Boot. Serialization and Deserialization happen automatically for us. Based on the method parameter and return type – Spring Boot automatically finds what kind of interaction model to use.

Learn more about RSocket:

RSocket File Upload Example

 

The source code is available here.

Happy learning 🙂

 

Share This:

2 thoughts on “RSocket With Spring Boot

  1. great article,

    just a question, ip and port in config is at startup, how to handle say for example you have 3 replica pods. we can get the ip of the pods, but what if the pod restart, so ip changed.

    1. In real life we would not depend on IP. You can use the service name or some sort of client side load balancer / service discovery like consul etc.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.