Site icon Vinsguru

gRPC Bidirectional Streaming

Overview:

In this tutorial, I would like to show you how to implement gRPC Bidirectional Streaming API in Java.

I assume that you have a basic understanding of what gRPC is. If not, read the below articles first.

  1. Protocol Buffers – A Simple Introduction
  2. gRPC – An Introduction Guide
  3. gRPC Unary API In Java – Easy Steps
  4. gRPC Server Streaming API In Java
  5. gRPC Client Streaming API In Java

gRPC Bidirectional Streaming:

In the gRPC Bidirectional streaming API, the client and server can exchange multiple messages between them via a single TCP connection. These message exchanges can be completely independent of each other. The client and server can close the call when they are done with the message exchanges.

For ex: Consider these application behaviors. They are good example of client-server bidirectional streaming.

Sample Application:

For this tutorial, We are going to create an interesting GPS for your car! That is – you would like to travel from point A to point B which is 100 units apart. Once you start driving, we would be syncing with the server every 3 seconds and tracking your position (in this case based on the distance you traveled in every 3 seconds). Our server will respond with remaining distance to the destination and approximate time taken to reach the destination until the trip is completed!

 

Protobuf – Service Definition:

Let’s create a service definition for this. navigate is going to be the method to be implemented on the server side. This service definition shows what type of input to be sent and what type of output to expect. We use the stream keyword for both input & output to indicate that it is going to be a bidirectional streaming request/response.

syntax = "proto3";

package gps;

option java_package = "com.vinsguru.gps";
option java_multiple_files = true;

message TripRequest {
  int32 distanceTravelled = 1;
}

message TripResponse {
  int32 remainingDistance = 1;
  int32 timeToDestination = 2;
}

service NavigationService {
  // grpc bidirectional stream
  rpc navigate(stream TripRequest) returns (stream TripResponse);
}

When we issue the below maven command, maven automatically creates the client and server side code using protoc tool.

mvn clean compile

NavigatorServiceImplBase class in the below picture is auto-generated abstract class which needs to be implemented by the server for the above service definition. Similarly NavigatorServiceStub is the actual implementation class which client should use to make a request

gRPC Bidirectional Streaming – Server Side:

Service Implementation: Let’s extend the abstract NavigatorServiceImplBase to add our implementation to respond to the navigate call. The server can expect multiple input objects from client as it is a streaming request. When the client calls onCompleted, it notifies the server that it has reached the destination. At that time server can also close the call.

public class TripRequestObserver implements StreamObserver<TripRequest> {

    private final int totalDistance = 100;
    private LocalTime startTime = LocalTime.now();
    private int distanceTraveled;
    private final StreamObserver<TripResponse> tripResponseStreamObserver;

    public TripRequestObserver(StreamObserver<TripResponse> tripResponseStreamObserver) {
        this.tripResponseStreamObserver = tripResponseStreamObserver;
    }

    @Override
    public void onNext(TripRequest tripRequest) {
        this.distanceTraveled = Math.min(totalDistance, (this.distanceTraveled + tripRequest.getDistanceTravelled()));
        int remainingDistance = Math.max(0, (totalDistance - distanceTraveled));

        // the client has reached destination
        if(remainingDistance == 0){
            this.tripResponseStreamObserver.onNext(TripResponse.getDefaultInstance());
            return;
        }

        // client has not yet reached destination
        long elapsedDuration = Duration.between(this.startTime, LocalTime.now()).getSeconds();
        elapsedDuration = elapsedDuration < 1 ? 1 : elapsedDuration;
        double currentSpeed = (distanceTraveled * 1.0d) / elapsedDuration;
        int timeToReach = (int) (remainingDistance / currentSpeed);
        TripResponse tripResponse = TripResponse.newBuilder()
                .setRemainingDistance(remainingDistance)
                .setTimeToDestination(timeToReach)
                .build();
        this.tripResponseStreamObserver.onNext(tripResponse);
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {
        this.tripResponseStreamObserver.onCompleted();
        System.out.println("Client reached safely");
    }

}

The above implementation is the actual business logic for our use case which needs to be executed whenever the client initiates a new navigate request.

public class NavigationService extends NavigationServiceGrpc.NavigationServiceImplBase {

    @Override
    public StreamObserver<TripRequest> navigate(StreamObserver<TripResponse> responseObserver) {
        return new TripRequestObserver(responseObserver);
    }

}

Once the service implementation is done, Let’s add it to the server to serve the client calls. We are listening on port 6565. Start this server by invoking the main method.

public class GPSServer {

    public static void main(String[] args) throws IOException, InterruptedException {

        // build gRPC server
        Server server = ServerBuilder.forPort(6565)
                .addService(new NavigationService())
                .build();

        // start
        server.start();

        // shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("GPS is shutting down!");
            server.shutdown();
        }));

        server.awaitTermination();

    }

}

Now our server is ready, up and running!

gRPC Bidirectional Streaming – Client Side:

Protobuf already has generated the client library. As a first step to make this request, we need to have an implementation for the StreamObserver.

public class TripResponseStreamObserver implements StreamObserver<TripResponse> {

    private StreamObserver<TripRequest> requestStreamObserver;

    @Override
    public void onNext(TripResponse tripResponse) {
       if(tripResponse.getRemainingDistance() > 0){
           print(tripResponse);
           this.drive();
       }else{
           this.requestStreamObserver.onCompleted();
       }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {
        System.out.println("Trip Completed");
    }

    public void startTrip(StreamObserver<TripRequest> requestStreamObserver){
        this.requestStreamObserver = requestStreamObserver;
        this.drive();
    }

    private void drive(){
        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
        TripRequest tripRequest = TripRequest.newBuilder().setDistanceTravelled(ThreadLocalRandom.current().nextInt(1, 10)).build();
        requestStreamObserver.onNext(tripRequest);
    }

    private void print(TripResponse tripResponse){
        System.out.println(LocalTime.now() + ": Remaining Distance : " + tripResponse.getRemainingDistance());
        System.out.println(LocalTime.now() + ": Time To Reach (sec): " + tripResponse.getTimeToDestination());
        System.out.println("------------------------------");
    }

}

The client has to do the following to make a request and receive the response.

To demo this, I am going to create a simple JUnit test class to act like a gRPC client. Do note that client can be anything. It could even be another microservice.

public class BiDirectionalStreamingTest {

    private ManagedChannel channel;
    private NavigationServiceGrpc.NavigationServiceStub clientStub;

    @Before
    public void setup(){
        this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();
        this.clientStub = NavigationServiceGrpc.newStub(channel);
    }

    @Test
    public void tripTest() throws InterruptedException {
        TripResponseStreamObserver tripResponseStreamObserver = new TripResponseStreamObserver();
        StreamObserver<TripRequest> requestStreamObserver = this.clientStub.navigate(tripResponseStreamObserver);
        tripResponseStreamObserver.startTrip(requestStreamObserver);
    }

    @After
    public void teardown(){
        this.channel.shutdown();
    }

}

Now we are ready to drive!

Demo:

When I run the test, I am able to see output as shown here.

...
14:25:03.214192: Remaining Distance : 23
14:25:03.214291: Time To Reach (sec): 12
------------------------------
14:25:06.217532: Remaining Distance : 15
14:25:06.217628: Time To Reach (sec): 7
------------------------------
14:25:09.220906: Remaining Distance : 7
14:25:09.221105: Time To Reach (sec): 3
------------------------------
14:25:12.224692: Remaining Distance : 5
14:25:12.224855: Time To Reach (sec): 2
------------------------------
Trip Completed

gRPC Course:

I learnt gRPC + Protobuf in a hard way. But you can learn them quickly on Udemy. Yes, I have created a separate step by step course on Protobuf + gRPC along with Spring Boot integration for the next generation Microservice development. Click here for the special link.


Summary:

We were able to successfully demonstrate the gRPC Bidirectional Streaming API.

Learn more about gRPC.

The source code is available here.

Happy learning 🙂

 

Share This:

Exit mobile version