Site icon Vinsguru

gRPC File Upload With Client Streaming

Overview:

In this gRPC File Upload tutorial, I would like to show you how we could make use of gRPC client streaming feature to implement file upload functionality for your application.

If you are new to gRPC, I request you to take a look at these articles first.

gRPC File Upload:

gRPC is a great choice for client-server application development or good alternate for replacing traditional REST based inter-microservices communication. gRPC provides 4 different RPC types. One of them is Client streaming in which client can send multiple requests to the server as part of single RPC/connection. We are going to make use of this, upload a large file as small chunks into the server to implement this gRPC file upload functionality.

(I just wanted to write this as a detailed article after answering this question in the stack overflow.)

Protobuf Definition:

When we upload a file into a server, we might want to send metadata about the file along with the file content. The protobuf message type could be more or less like this.

message FileUploadRequest {
    MetaData metadata = 1;
    File file = 2;
}

We might not be able to send a large file in a single request due to various limitations. So, we need to send them as small chunks asynchronously.  In this case, we will end up sending the above request type again and again. This will make us send the metadata again and again which is not required.  This is where protobuf oneof helps.

Lets create protobuf message types for request and response etc as shown here.

package file;

option java_package = "com.vinsguru.io";
option java_multiple_files = true;

message MetaData {
  string name = 1;
  string type = 2;
}

message File {
  bytes content = 1;
}

enum Status {
  PENDING = 0;
  IN_PROGRESS = 1;
  SUCCESS = 2;
  FAILED = 3;
}

message FileUploadRequest {
  oneof request {
    MetaData metadata = 1;
    File file = 2;
  }
}

message FileUploadResponse {
  string name = 1;
  Status status = 2;
}

service FileService {
  rpc upload(stream FileUploadRequest) returns(FileUploadResponse);
}

gRPC Course:

I learnt gRPC + Protobuf in a hard way. But you can learn them quickly on Udemy. Yes, I have created a separate step by step course on Protobuf + gRPC along with Spring Boot integration for the next generation Microservice development. Click here for the special link.


gRPC File Upload – Server Side:

The client would be sending the file as small chunks as a streaming requests. The server assumes that first request would be the metadata request and subsequent request would be for file content. The server will be writing the file content as and when it receives. When the client calls the onCompleted method, the server will close & save the file on its side as the client has notified the server that it has sent everything by calling the onCompleted method.

public class FileUploadService extends FileServiceGrpc.FileServiceImplBase {

    private static final Path SERVER_BASE_PATH = Paths.get("src/test/resources/output");

    @Override
    public StreamObserver<FileUploadRequest> upload(StreamObserver<FileUploadResponse> responseObserver) {
        return new StreamObserver<FileUploadRequest>() {
            // upload context variables
            OutputStream writer;
            Status status = Status.IN_PROGRESS;

            @Override
            public void onNext(FileUploadRequest fileUploadRequest) {
                try{
                    if(fileUploadRequest.hasMetadata()){
                        writer = getFilePath(fileUploadRequest);
                    }else{
                        writeFile(writer, fileUploadRequest.getFile().getContent());
                    }
                }catch (IOException e){
                    this.onError(e);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                status = Status.FAILED;
                this.onCompleted();
            }

            @Override
            public void onCompleted() {
                closeFile(writer);
                status = Status.IN_PROGRESS.equals(status) ? Status.SUCCESS : status;
                FileUploadResponse response = FileUploadResponse.newBuilder()
                        .setStatus(status)
                        .build();
                responseObserver.onNext(response);
                responseObserver.onCompleted();
            }
        };
    }

    private OutputStream getFilePath(FileUploadRequest request) throws IOException {
        var fileName = request.getMetadata().getName() + "." + request.getMetadata().getType();
        return Files.newOutputStream(SERVER_BASE_PATH.resolve(fileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
    }

    private void writeFile(OutputStream writer, ByteString content) throws IOException {
        writer.write(content.toByteArray());
        writer.flush();
    }

    private void closeFile(OutputStream writer){
        try {
            writer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
Server server = ServerBuilder
        .forPort(6565)
        .addService(new FileUploadService())
        .build();

// start
server.start();

gRPC File Upload – Client Streaming:

Our server side looks good. Lets work on the client side.

    private ManagedChannel channel;
    private FileServiceGrpc.FileServiceStub fileServiceStub;

    public void setup(){
        this.channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext()
                .build();
        this.fileServiceStub = FileServiceGrpc.newStub(channel);
    }
class FileUploadObserver implements StreamObserver<FileUploadResponse> {

    @Override
    public void onNext(FileUploadResponse fileUploadResponse) {
        System.out.println(
                "File upload status :: " + fileUploadResponse.getStatus()
        );
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

}
// request observer
StreamObserver<FileUploadRequest> streamObserver = this.fileServiceStub.upload(new FileUploadObserver());

// input file for testing
Path path = Paths.get("src/test/resources/input/java_input.pdf");

// build metadata
FileUploadRequest metadata = FileUploadRequest.newBuilder()
        .setMetadata(MetaData.newBuilder()
                .setName("output")
                .setType("pdf").build())
        .build();
streamObserver.onNext(metadata);

// upload file as chunk
InputStream inputStream = Files.newInputStream(path);
byte[] bytes = new byte[4096];
int size;
while ((size = inputStream.read(bytes)) > 0){
    FileUploadRequest uploadRequest = FileUploadRequest.newBuilder()
            .setFile(File.newBuilder().setContent(ByteString.copyFrom(bytes, 0 , size)).build())
            .build();
    streamObserver.onNext(uploadRequest);
}

// close the stream
inputStream.close();
streamObserver.onCompleted();

Demo:

When we run the test, a large file is sent as 4KB file chunks as part of streaming requests to the server. When the client is done with streaming, it invokes the onCompleted method which makes the server closes the file and sends the final status back to the client.

File upload status :: SUCCESS

Summary:

We were able to successfully demonstrate the grpc file upload using client streaming request. We also understood how to use oneof type in protobuf.

The source code is here.

Happy learning 🙂

Share This:

Exit mobile version