Site icon Vinsguru

gRPC Server Streaming

Overview:

In this tutorial, I would like to show you how to implement gRPC Server Streaming API in Java.

I assume that you have a basic understanding of what gRPC is. If not, read the below articles first.

  1. Protocol Buffers – A Simple Introduction
  2. gRPC – An Introduction Guide
  3. gRPC Unary API In Java – Easy Steps

gRPC Server Streaming:

With gRPC framework , we should be able to send multiple messages between a client and a server via a single TCP connection. It is called Multiplexing.

In gRPC server streaming API,  client sends a single request to the server for which the client can expect multiple responses back. The server will notify the client once it has sent all the possible responses.

A real life use case could be taxi-booking/uber functionality in which client places a taxi request for which uber might send multiple responses who the driver is, where the driver is, how long the driver will take to pick you up…etc.

Sample Application:

For this tutorial, We are going to create a simple e-commerce application in which the client can place an order request. Once the server receives the order request, depends on the order category, it will take certain steps and deliver the product to the client.

Protobuf – Service Definition:

Now we know what the business requirement is & what the client expects! So let’s create a service definition for this as shown below. placeOrder is going to be the method to be implemented on the server side. This service definition shows what type of input to be sent and what type of output to expect. We use the stream keyword for the output to indicate that it is going to be a server side streaming response for a single client request.

syntax = "proto3";

package ecommerce;

option java_package = "com.vinsguru.ecommerce";
option java_multiple_files = true;

enum Category {
  ELECTRONICS = 0;
  EBOOKS = 1;
}

enum Status {
  PAYMENT_RECEIVED = 0;
  SHIPPED = 1;
  OUT_FOR_DELIVERY = 2;
  DELIVERED = 3;
}

message OrderRequest {
  Category orderCategory = 1;
}

message OrderResponse {
  Status orderStatus = 1;
}

service ECommerceService {
  // server stream
  rpc placeOrder(OrderRequest) returns (stream OrderResponse);
}

When we issue the below maven command, maven automatically creates the client and server side code using protoc tool.

mvn clean compile

ECommerceServiceImplBase class in the below picture is auto-generated abstract class which needs to be implemented by the server for the above service definition. Similarly ECommerceServiceStub is the actual implementation class which client should use to make a request.

gRPC Server Streaming – Server Side:

Service Implementation: Let’s extend the abstract ECommerceServiceImplBase to add our implementation to respond to the placeOrder call. The server will receive an order request. Depends on the order category, the server has to send appropriate responses to client.

public class AmazonOnlineService extends ECommerceServiceGrpc.ECommerceServiceImplBase {

    Map<Category, Consumer<StreamObserver<OrderResponse>>> categoryHandler = Map.of(
        Category.ELECTRONICS, this::handleElectronics,
        Category.EBOOKS, this::handleEBooks
    );

    @Override
    public void placeOrder(OrderRequest request, StreamObserver<OrderResponse> responseObserver) {
        categoryHandler.get(request.getOrderCategory()).accept(responseObserver);
        responseObserver.onCompleted();
    }

    private void handleElectronics(StreamObserver<OrderResponse> responseStreamObserver){
        Stream.of(PAYMENT_RECEIVED, SHIPPED, OUT_FOR_DELIVERY, DELIVERED)
                .map(OrderResponse.newBuilder()::setOrderStatus)
                .map(OrderResponse.Builder::build)
                .peek(i -> Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS))
                .forEach(responseStreamObserver::onNext);
    }

    private void handleEBooks(StreamObserver<OrderResponse> responseStreamObserver){
        Stream.of(PAYMENT_RECEIVED, DELIVERED)
                .map(OrderResponse.newBuilder()::setOrderStatus)
                .map(OrderResponse.Builder::build)
                .forEach(responseStreamObserver::onNext);
    }

}

Once the service implementation is done, Let’s add it to the server to serve the client calls. We are listening on port 6565. Start this server by invoking the main method.

public class AmazonServer {

    public static void main(String[] args) throws IOException, InterruptedException {

        // build gRPC server
        Server server = ServerBuilder.forPort(6565)
                .addService(new AmazonOnlineService())
                .build();

        // start
        server.start();

        // shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Amazon server is shutting down!");
            server.shutdown();
        }));

        server.awaitTermination();

    }

}

Now our server is ready, up and running!

gRPC Server Streaming – Client Side:

Protobuf already has generated the client library. As a first step to make this request, we need to have an implementation for the StreamObserver. Because we are expecting multiple responses back from the server asynchronously. This is to just print the order status when the client receives the response from the server.

public class OrderResponseStreamObserver implements StreamObserver<OrderResponse> {

    @Override
    public void onNext(OrderResponse orderResponse) {
        System.out.println(
                LocalDateTime.now() + " : " + orderResponse.getOrderStatus()
        );
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

}

The client has to do the following to make a request and receive the response.

To demo this, I am going to create a simple JUnit test class to act like a gRPC client. Do note that client can be anything. It could even be another microservice.

public class ServerStreamingTest {

    private ManagedChannel channel;
    private ECommerceServiceGrpc.ECommerceServiceStub clientStub;

    @Before
    public void setup(){
        this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();
        this.clientStub = ECommerceServiceGrpc.newStub(channel);
    }

    @Test
    public void bookStreamingTest() throws InterruptedException {
        OrderRequest ebook = OrderRequest.newBuilder().setOrderCategory(Category.EBOOKS).build();
        this.clientStub.placeOrder(ebook, new OrderResponseStreamObserver());
    }

    @Test
    public void electronicStreamingTest() throws InterruptedException {
        OrderRequest electronic = OrderRequest.newBuilder().setOrderCategory(Category.ELECTRONICS).build();
        this.clientStub.placeOrder(electronic, new OrderResponseStreamObserver());
    }

    @After
    public void teardown(){
        this.channel.shutdown();
    }

}

Demo:

When we run the test, depends on the order category we get appropriate responses.

2020-11-02T21:02:26.864986 : PAYMENT_RECEIVED
2020-11-02T21:02:26.865636 : DELIVERED
2020-11-02T21:02:38.047878 : PAYMENT_RECEIVED
2020-11-02T21:02:41.042821 : SHIPPED
2020-11-02T21:02:44.039482 : OUT_FOR_DELIVERY
2020-11-02T21:02:47.039461 : DELIVERED

Summary:

We were able to successfully demonstrate the gRPC Server Streaming API with a simple example.

Learn more about gRPC.

The source code is available here.

Happy learning 🙂

Share This:

Exit mobile version