gRPC Bidirectional Streaming

Overview:

In this tutorial, I would like to show you how to implement gRPC Bidirectional Streaming API in Java.

I assume that you have a basic understanding of what gRPC is. If not, read the below articles first.

  1. Protocol Buffers – A Simple Introduction
  2. gRPC – An Introduction Guide
  3. gRPC Unary API In Java – Easy Steps
  4. gRPC Server Streaming API In Java
  5. gRPC Client Streaming API In Java

gRPC Bidirectional Streaming:

In the gRPC Bidirectional streaming API, the client and server can exchange multiple messages between them via a single TCP connection. These message exchanges can be completely independent of each other. The client and server can close the call when they are done with the message exchanges.

grpc bidirectional streaming

For ex: Consider these application behaviors. They are good example of client-server bidirectional streaming.

  • Google search screen: As soon as you enter some keywords, it is sent to the server and the server immediately responds with possible search keywords.
  • Netflix/YouTube: Based on the videos you search/watch, you get more suggestions related to that.

Sample Application:

For this tutorial, We are going to create an interesting GPS for your car! That is – you would like to travel from point A to point B which is 100 units apart. Once you start driving, we would be syncing with the server every 3 seconds and tracking your position (in this case based on the distance you traveled in every 3 seconds). Our server will respond with remaining distance to the destination and approximate time taken to reach the destination until the trip is completed!

 

Protobuf – Service Definition:

Let’s create a service definition for this. navigate is going to be the method to be implemented on the server side. This service definition shows what type of input to be sent and what type of output to expect. We use the stream keyword for both input & output to indicate that it is going to be a bidirectional streaming request/response.

syntax = "proto3";

package gps;

option java_package = "com.vinsguru.gps";
option java_multiple_files = true;

message TripRequest {
  int32 distanceTravelled = 1;
}

message TripResponse {
  int32 remainingDistance = 1;
  int32 timeToDestination = 2;
}

service NavigationService {
  // grpc bidirectional stream
  rpc navigate(stream TripRequest) returns (stream TripResponse);
}

When we issue the below maven command, maven automatically creates the client and server side code using protoc tool.

mvn clean compile

NavigatorServiceImplBase class in the below picture is auto-generated abstract class which needs to be implemented by the server for the above service definition. Similarly NavigatorServiceStub is the actual implementation class which client should use to make a request

gRPC Bidirectional Streaming – Server Side:

Service Implementation: Let’s extend the abstract NavigatorServiceImplBase to add our implementation to respond to the navigate call. The server can expect multiple input objects from client as it is a streaming request. When the client calls onCompleted, it notifies the server that it has reached the destination. At that time server can also close the call.

  • When we first start, we initialize the distance as 100 units, we set the start time etc.
  • Every 3 seconds, the client will send the units it traveled in 3 seconds window as part of the onNext call.
  • When we receive the onNext call, we check the  total units traveled by the client and calculate the remaining distance.
  • We also the know the speed of the client. (Distance traveled / Time taken). Using this speed, we also calculate the approximate time taken to reach the destination.
public class TripRequestObserver implements StreamObserver<TripRequest> {

    private final int totalDistance = 100;
    private LocalTime startTime = LocalTime.now();
    private int distanceTraveled;
    private final StreamObserver<TripResponse> tripResponseStreamObserver;

    public TripRequestObserver(StreamObserver<TripResponse> tripResponseStreamObserver) {
        this.tripResponseStreamObserver = tripResponseStreamObserver;
    }

    @Override
    public void onNext(TripRequest tripRequest) {
        this.distanceTraveled = Math.min(totalDistance, (this.distanceTraveled + tripRequest.getDistanceTravelled()));
        int remainingDistance = Math.max(0, (totalDistance - distanceTraveled));

        // the client has reached destination
        if(remainingDistance == 0){
            this.tripResponseStreamObserver.onNext(TripResponse.getDefaultInstance());
            return;
        }

        // client has not yet reached destination
        long elapsedDuration = Duration.between(this.startTime, LocalTime.now()).getSeconds();
        elapsedDuration = elapsedDuration < 1 ? 1 : elapsedDuration;
        double currentSpeed = (distanceTraveled * 1.0d) / elapsedDuration;
        int timeToReach = (int) (remainingDistance / currentSpeed);
        TripResponse tripResponse = TripResponse.newBuilder()
                .setRemainingDistance(remainingDistance)
                .setTimeToDestination(timeToReach)
                .build();
        this.tripResponseStreamObserver.onNext(tripResponse);
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {
        this.tripResponseStreamObserver.onCompleted();
        System.out.println("Client reached safely");
    }

}

The above implementation is the actual business logic for our use case which needs to be executed whenever the client initiates a new navigate request.

public class NavigationService extends NavigationServiceGrpc.NavigationServiceImplBase {

    @Override
    public StreamObserver<TripRequest> navigate(StreamObserver<TripResponse> responseObserver) {
        return new TripRequestObserver(responseObserver);
    }

}

Once the service implementation is done, Let’s add it to the server to serve the client calls. We are listening on port 6565. Start this server by invoking the main method.

public class GPSServer {

    public static void main(String[] args) throws IOException, InterruptedException {

        // build gRPC server
        Server server = ServerBuilder.forPort(6565)
                .addService(new NavigationService())
                .build();

        // start
        server.start();

        // shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("GPS is shutting down!");
            server.shutdown();
        }));

        server.awaitTermination();

    }

}

Now our server is ready, up and running!

gRPC Bidirectional Streaming – Client Side:

Protobuf already has generated the client library. As a first step to make this request, we need to have an implementation for the StreamObserver.

  • As part of the drive method, we would travel for 3 seconds and let the server know the units traveled in 3 seconds. We keep doing this again and again until we reach the destination.
  • When we reach the destination, we let the server know that, this call can be closed.
public class TripResponseStreamObserver implements StreamObserver<TripResponse> {

    private StreamObserver<TripRequest> requestStreamObserver;

    @Override
    public void onNext(TripResponse tripResponse) {
       if(tripResponse.getRemainingDistance() > 0){
           print(tripResponse);
           this.drive();
       }else{
           this.requestStreamObserver.onCompleted();
       }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {
        System.out.println("Trip Completed");
    }

    public void startTrip(StreamObserver<TripRequest> requestStreamObserver){
        this.requestStreamObserver = requestStreamObserver;
        this.drive();
    }

    private void drive(){
        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
        TripRequest tripRequest = TripRequest.newBuilder().setDistanceTravelled(ThreadLocalRandom.current().nextInt(1, 10)).build();
        requestStreamObserver.onNext(tripRequest);
    }

    private void print(TripResponse tripResponse){
        System.out.println(LocalTime.now() + ": Remaining Distance : " + tripResponse.getRemainingDistance());
        System.out.println(LocalTime.now() + ": Time To Reach (sec): " + tripResponse.getTimeToDestination());
        System.out.println("------------------------------");
    }

}

The client has to do the following to make a request and receive the response.

  • Creating channel: The client has to create a channel/connection with the back-end server first.
  • Stub: The client will use a non-blocking stub to make a request by passing the required parameters.

To demo this, I am going to create a simple JUnit test class to act like a gRPC client. Do note that client can be anything. It could even be another microservice.

public class BiDirectionalStreamingTest {

    private ManagedChannel channel;
    private NavigationServiceGrpc.NavigationServiceStub clientStub;

    @Before
    public void setup(){
        this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();
        this.clientStub = NavigationServiceGrpc.newStub(channel);
    }

    @Test
    public void tripTest() throws InterruptedException {
        TripResponseStreamObserver tripResponseStreamObserver = new TripResponseStreamObserver();
        StreamObserver<TripRequest> requestStreamObserver = this.clientStub.navigate(tripResponseStreamObserver);
        tripResponseStreamObserver.startTrip(requestStreamObserver);
    }

    @After
    public void teardown(){
        this.channel.shutdown();
    }

}

Now we are ready to drive!

Demo:

When I run the test, I am able to see output as shown here.

...
14:25:03.214192: Remaining Distance : 23
14:25:03.214291: Time To Reach (sec): 12
------------------------------
14:25:06.217532: Remaining Distance : 15
14:25:06.217628: Time To Reach (sec): 7
------------------------------
14:25:09.220906: Remaining Distance : 7
14:25:09.221105: Time To Reach (sec): 3
------------------------------
14:25:12.224692: Remaining Distance : 5
14:25:12.224855: Time To Reach (sec): 2
------------------------------
Trip Completed

gRPC Course:

I learnt gRPC + Protobuf in a hard way. But you can learn them quickly on Udemy. Yes, I have created a separate step by step course on Protobuf + gRPC along with Spring Boot integration for the next generation Microservice development. Click here for the special link.


Summary:

We were able to successfully demonstrate the gRPC Bidirectional Streaming API.

Learn more about gRPC.

The source code is available here.

Happy learning 🙂

 

Share This:

1 thought on “gRPC Bidirectional Streaming

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.