gRPC Client Streaming

Overview:

In this tutorial, I would like to show you how to implement gRPC Client Streaming API in Java.

I assume that you have a basic understanding of what gRPC is. If not, read the below articles first.

  1. Protocol Buffers – A Simple Introduction
  2. gRPC – An Introduction Guide
  3. gRPC Unary API In Java – Easy Steps
  4. gRPC Server Streaming API In Java

gRPC Client Streaming:

With gRPC framework , we should be able to send multiple messages between a client and a server via a single TCP connection. It is called Multiplexing.

In the gRPC client streaming API. Client keeps on sending multiples requests to the server.  Once client confirms that it has sent all the requests, server sends only a single response back to the client.

grpc client streaming

A real life use case could be a file upload functionality in which client sends a large file by splitting into multiple small chunks or a service which stores all the user click events in a database etc.

Sample Application:

For this tutorial, to keep things simple, We are going to assume that client sends multiple numbers one by one to the server, the server sends one final sum of all the numbers when the client is done with its request.

Protobuf – Service Definition:

Now we know what the business requirement is & what the client expects! So let’s create a service definition for this. sumAll is going to be the method to be implemented on the server side. This service definition shows what type of input to be sent and what type of output to expect. We use the stream keyword for the input to indicate that it is going to be a client side streaming request. The server is expected to send only one response.

syntax = "proto3";

package calculator;

option java_package = "com.vinsguru.calculator";
option java_multiple_files = true;

message Input {
  int32 number = 1;
}

message Output {
  int64 result = 1;
}

service CalculatorService {
  // client stream
  rpc sumAll(stream Input) returns (Output) {};
}

When we issue the below maven command, maven automatically creates the client and server side code using protoc tool.

mvn clean compile

CalculatorServiceImplBase class in the below picture is auto-generated abstract class which needs to be implemented by the server for the above service definition. Similarly CalculatorServiceStub is the actual implementation class which client should use to make a request.

gRPC Client Streaming – Server Side:

Service Implementation: Let’s extend the abstract CalculatorServiceImplBase to add our implementation to respond to the sumAll call. The server can expect multiple Input objects from client as it is a streaming request. When the client calls onCompleted, it notifies the server that it has sent everything. At that time server can respond back.

  • Success response: We find the sum for all the given numbers as and when we receive them and we respond back using responseObserver. We return the result Output object via onNext method to the calling client and we also notify the client that server job is done for this call by an onCompleted call.
  • Error response:  In case of any error in processing the request, the server will use onError to notify the client. We will discuss that in a separate article.
public class InputStreamObserver implements StreamObserver<Input> {

    private int sum = 0;
    private final StreamObserver<Output> outputStreamObserver;

    public InputStreamObserver(StreamObserver<Output> outputStreamObserver) {
        this.outputStreamObserver = outputStreamObserver;
    }

    @Override
    public void onNext(Input input) {
        sum = sum + input.getNumber();
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {
        Output output = Output.newBuilder().setResult(sum).build();
        outputStreamObserver.onNext(output);
        outputStreamObserver.onCompleted();
    }

}

The above implementation is the actual business logic for our use case which needs to be executed whenever a new RPC is received for sumAll.

public class ClientStreamingSumService extends CalculatorServiceGrpc.CalculatorServiceImplBase {

    @Override
    public StreamObserver<Input> sumAll(StreamObserver<Output> responseObserver) {
        return new InputStreamObserver(responseObserver);
    }

}

Once the service implementation is done, Let’s add it to the server to serve the client calls. We are listening on port 6565. Start this server by invoking the main method.

public class CalculatorServer {

    public static void main(String[] args) throws IOException, InterruptedException {

        // build gRPC server
        Server server = ServerBuilder.forPort(6565)
                .addService(new ClientStreamingSumService())
                .build();

        // start
        server.start();

        // shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("gRPC server is shutting down!");
            server.shutdown();
        }));

        server.awaitTermination();

    }

}

Now our server is ready, up and running!

gRPC Client Streaming – Client Side:

Protobuf already has generated the client library. As a first step to make this request, we need to have an implementation for the StreamObserver. This is to just print the sum value when the client receives the response from the server.

public class OutputStreamObserver implements StreamObserver<Output> {

    @Override
    public void onNext(Output output) {
        System.out.println(
                "Received : " + output.getResult()
        );
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

}

The client has to do the following to make a request and receive the response.

  • Creating channel: The client has to create a channel/connection with the back-end server first.
  • Stub: The client will use a non-blocking stub to make a request by passing the required parameters sequentially.

To demo this, I am going to create a simple JUnit test class to act like a gRPC client. Do note that client can be anything. It could even be another microservice.

public class ClientStreamingTest {

    private ManagedChannel channel;
    private CalculatorServiceGrpc.CalculatorServiceStub clientStub;

    @Before
    public void setup(){
        this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();
        this.clientStub = CalculatorServiceGrpc.newStub(channel);
    }

    @Test
    public void clientStreamingSumTest() throws InterruptedException {

        // pass the output stream observer & receive the input stream observer
        StreamObserver<Input> inputStreamObserver = this.clientStub.sumAll(new OutputStreamObserver());

        for (int i = 0; i <= 100; i++) {
            // build the request object
            Input input = Input.newBuilder()
                    .setNumber(i)
                    .build();
            // pass the request object via input stream observer
            inputStreamObserver.onNext(input);
        }

        // client side is done. this method makes the server respond with the sum value
        inputStreamObserver.onCompleted();

    }

    @After
    public void teardown(){
        this.channel.shutdown();
    }

}

Now we are ready to send the request and receive the response. If we execute the client side code, the client sends 100 numbers to the server. When it calls onCompleted, the server responds with the sum value which is also printed on the client side console output.

Summary:

We were able to successfully demonstrate the gRPC Client Streaming API with a simple example.

Learn more about gRPC.

The source code is available here.

Happy learning 🙂

Share This:

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.