RSocket File Upload Example

Overview:

In this tutorial, I would like to show you RSocket File Upload – how we could upload large files in a complete reactive programming style with back-pressure support.

RSocket is a message passing protocol for multiplexed, duplex communication which supports TCPWebSockets and Aeron (UDP).  If you are new to RSocket, take a look at these articles to learn more.

RSocket File Upload:

In an application, when an one component , say A, is very slow in responding to a request, the request sender might want to slow down the rate of sending the requests to avoid further stress on the component under load. Otherwise the component A might crash / lose data etc. With Reactive programming / back-pressure support, the sender is aware of the component A’s rate of processing the requests and sends the request only when the component A is capable of processing. This also helps the sender not to do too much work from its side.

For ex: Lets assume there is a client which tries to upload 10 GB file. The client might have 100 GB RAM and might even hold the entire file in the memory and try to send the upload request to the server. But the poor server with 1 GB RAM might not be able to process the request or It might take significant amount of time to process such request and respond to the client that the request is complete / failed.

We also do not want to block the connection between client and server during this processing. A request might fail after 90% of upload for some reason. In that case the client might want to send the same request again to the server. If you see, there are many issues with this approach.

It is good if the server processes the request as few file chunks and write as and when it can as shown here. So that client knows the progress of the file upload. If a specific chunk is lost, the client can resend that chunk instead of sending the whole file. The client can also do other tasks if the server is very slow.

rsocket file upload

Let’s see how we can achieve this using rsocket + Spring Boot.

Sample Application:

Just create a simple Spring Boot application with RSocket dependency.

We will develop a simple client and server application which will do the following.

  • The client will send a PDF to the server as stream of byte array with size of 4096.
  • The server will write each chunk and respond back to the client with status.
  • Once the client confirms that it has sent everything, The server also confirms that file is completely written on the server side.
  • If something unexpected happens, the server will respond with Failed status.

Models:

  • Status
    • chunk_completed for individual chunks
    • completed is for final upload
    • failed if upload failed for some reason
public enum Status {

    CHUNK_COMPLETED,
    COMPLETED,
    FAILED;

}
  • Mime types & other parameters
public class Constants {

    public static final String MIME_FILE_EXTENSION   = "message/x.upload.file.extension";
    public static final String MIME_FILE_NAME        = "message/x.upload.file.name";
    public static final String FILE_NAME = "file-name";
    public static final String FILE_EXTN = "file-extn";

}

RSocket File Upload – Server Side:

  • Service:
@Service
public class FileUploadService {

    @Value("${output.file.path:src/test/resources/output}")
    private Path outputPath;

    public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
        Path opPath = outputPath.resolve(path);
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        return DataBufferUtils.write(bufferFlux, channel)
                            .map(b -> Status.CHUNK_COMPLETED);
    }

}
  • Controller
@Controller
public class FileUploadController {

    @Autowired
    private FileUploadService service;

    @MessageMapping("file.upload")
    public Flux<Status> upload(@Headers Map<String, Object> metadata, @Payload Flux<DataBuffer> content) throws IOException {
        var fileName = metadata.get(Constants.FILE_NAME);
        var fileExtn = metadata.get(Constants.FILE_EXTN);
        var path = Paths.get(fileName + "." + fileExtn);
        return Flux.concat(service.uploadFile(path, content), Mono.just(Status.COMPLETED))
                    .onErrorReturn(Status.FAILED);

    }

}
  • application.properties for the rscoket port
spring.rsocket.server.port=6565

Configuration:

@Configuration
public class RSocketConfig {

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .metadataExtractorRegistry(metadataExtractorRegistry -> {
                    metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_EXTENSION), String.class, Constants.FILE_EXTN);
                    metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_NAME), String.class, Constants.FILE_NAME);
                })
                .build();
    }

    @Bean
    public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
                .connect(TcpClientTransport.create(6565));
    }

}

RSocket File Upload – Client Side:

@SpringBootTest
class FileUploadApplicationTests {

    @Autowired
    private Mono<RSocketRequester> rSocketRequester;

    @Value("classpath:input/java_tutorial.pdf")
    private Resource resource;

    @Test
    public void uploadFile()  {

        // read input file as 4096 chunks
        Flux<DataBuffer> readFlux = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096)
                .doOnNext(s -> System.out.println("Sent"));

        // rsocket request
        this.rSocketRequester
                .map(r -> r.route("file.upload")
                        .metadata(metadataSpec -> {
                            metadataSpec.metadata("pdf", MimeType.valueOf(Constants.MIME_FILE_EXTENSION));
                            metadataSpec.metadata("output", MimeType.valueOf(Constants.MIME_FILE_NAME));
                        })
                        .data(readFlux)
                )
                .flatMapMany(r -> r.retrieveFlux(Status.class))
                .doOnNext(s -> System.out.println("Upload Status : " + s))
                .subscribe();
        
    }

}
  • When we run this test, we see the below output.
...
...
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Upload Status : COMPLETED

Back Pressure Test:

Lets simulate the slow server by slowing down the write on the server side. Lets assume each chunk takes 1 second. I added the delay element to simulate that.

public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
    Path opPath = outputPath.resolve(path);
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
    return DataBufferUtils.write(bufferFlux.delayElements(Duration.ofSeconds(1)), channel)
                        .map(b -> Status.CHUNK_COMPLETED);
}

We will see a slightly different result this time. We will see 32 sent requests (chunks) first. (It is a default initial request which can be adjusted via reactor system property). As soon as client realizes that it has not received any response, it will not send any more request. Once it receives some responses, then it will send few more requests and wait for response. It will continue until the file upload process is 100% complete.

Sent
Sent
...
...
Sent
Sent
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED
Sent
Sent
...
...
Sent
...
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED

Without the back pressure support, the client would have sent everything – like thousands of chunk requests and waited forever to know later that it was never uploaded!!

Summary:

We were able to successfully demonstrate RSocket File Upload with Spring Boot.

Learn more about RSocket / Reactive Programming.

The complete source code is here.

Happy learning 🙂

 

Share This:

3 thoughts on “RSocket File Upload Example

  1. You don’t explain how to run this. I am telling my netbeans ide to use java 11. You don’t say if we are supposed to run the server. So I tried running the server. What is the “client”? Are we supposed to run FileUploadApplicationTests.java? both ways I get an error.
    If I start c.v.r.fileupload.FileUploadApplication as the “server”, it starts with: Netty RSocket started on port(s): 8900 i changed port

    if I then run the Test app it fails to bind to port:
    Error starting ApplicationContext. To display the conditions report re-run your application with ‘debug’ enabled.
    2021-01-08 18:09:56.469 ERROR 52480 — [ main] o.s.boot.SpringApplication : Application run failed

    org.springframework.context.ApplicationContextException: Failed to start bean ‘rSocketServerBootstrap’; nested exception is reactor.netty.ChannelBindException: Failed to bind on [0.0.0.0:8900]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]

    if I run just FileUploadApplicationTests, without server running i get:
    2021-01-08 18:12:40.868 INFO 52700 — [ main] c.v.r.f.FileUploadApplicationTests : Starting FileUploadApplicationTests on stan-pc with PID 52700 (started by stan in ……./git-repos/vinsguru-blog-code-samples/rsocket/file-upload)
    2021-01-08 18:12:40.869 INFO 52700 — [ main] c.v.r.f.FileUploadApplicationTests : No active profile set, falling back to default profiles: default
    2021-01-08 18:12:41.702 INFO 52700 — [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 8900
    2021-01-08 18:12:41.710 INFO 52700 — [ main] c.v.r.f.FileUploadApplicationTests : Started FileUploadApplicationTests in 1.035 seconds (JVM running for 1.735)
    Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.592 s – in com.vinsguru.rsocket.fileupload.FileUploadApplicationTests
    Sent
    2021-01-08 18:12:42.082 ERROR 52700 — [ parallel-2] reactor.core.scheduler.Schedulers : Scheduler worker in group main failed with an uncaught exception

    reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
    Caused by: java.nio.channels.ClosedChannelException: null

    Results:

    Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

  2. Hey vIns. I run the code it works fine just that in server side the line where metadata.get(Constant.FILE_EXTN) and metadata.get(Constant.FILE_NAME) is the value is null and as a result the file created is null.null. Please help me solved this problem.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.