Site icon Vinsguru

RSocket File Upload Example

Overview:

In this tutorial, I would like to show you RSocket File Upload – how we could upload large files in a complete reactive programming style with back-pressure support.

RSocket is a message passing protocol for multiplexed, duplex communication which supports TCPWebSockets and Aeron (UDP).  If you are new to RSocket, take a look at these articles to learn more.

RSocket File Upload:

In an application, when an one component , say A, is very slow in responding to a request, the request sender might want to slow down the rate of sending the requests to avoid further stress on the component under load. Otherwise the component A might crash / lose data etc. With Reactive programming / back-pressure support, the sender is aware of the component A’s rate of processing the requests and sends the request only when the component A is capable of processing. This also helps the sender not to do too much work from its side.

For ex: Lets assume there is a client which tries to upload 10 GB file. The client might have 100 GB RAM and might even hold the entire file in the memory and try to send the upload request to the server. But the poor server with 1 GB RAM might not be able to process the request or It might take significant amount of time to process such request and respond to the client that the request is complete / failed.

We also do not want to block the connection between client and server during this processing. A request might fail after 90% of upload for some reason. In that case the client might want to send the same request again to the server. If you see, there are many issues with this approach.

It is good if the server processes the request as few file chunks and write as and when it can as shown here. So that client knows the progress of the file upload. If a specific chunk is lost, the client can resend that chunk instead of sending the whole file. The client can also do other tasks if the server is very slow.

Let’s see how we can achieve this using rsocket + Spring Boot.

Sample Application:

Just create a simple Spring Boot application with RSocket dependency.

We will develop a simple client and server application which will do the following.

Models:

public enum Status {

    CHUNK_COMPLETED,
    COMPLETED,
    FAILED;

}
public class Constants {

    public static final String MIME_FILE_EXTENSION   = "message/x.upload.file.extension";
    public static final String MIME_FILE_NAME        = "message/x.upload.file.name";
    public static final String FILE_NAME = "file-name";
    public static final String FILE_EXTN = "file-extn";

}

RSocket File Upload – Server Side:

@Service
public class FileUploadService {

    @Value("${output.file.path:src/test/resources/output}")
    private Path outputPath;

    public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
        Path opPath = outputPath.resolve(path);
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        return DataBufferUtils.write(bufferFlux, channel)
                            .map(b -> Status.CHUNK_COMPLETED);
    }

}
@Controller
public class FileUploadController {

    @Autowired
    private FileUploadService service;

    @MessageMapping("file.upload")
    public Flux<Status> upload(@Headers Map<String, Object> metadata, @Payload Flux<DataBuffer> content) throws IOException {
        var fileName = metadata.get(Constants.FILE_NAME);
        var fileExtn = metadata.get(Constants.FILE_EXTN);
        var path = Paths.get(fileName + "." + fileExtn);
        return Flux.concat(service.uploadFile(path, content), Mono.just(Status.COMPLETED))
                    .onErrorReturn(Status.FAILED);

    }

}
spring.rsocket.server.port=6565

Configuration:

@Configuration
public class RSocketConfig {

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .metadataExtractorRegistry(metadataExtractorRegistry -> {
                    metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_EXTENSION), String.class, Constants.FILE_EXTN);
                    metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(Constants.MIME_FILE_NAME), String.class, Constants.FILE_NAME);
                })
                .build();
    }

    @Bean
    public Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
                .connect(TcpClientTransport.create(6565));
    }

}

RSocket File Upload – Client Side:

@SpringBootTest
class FileUploadApplicationTests {

    @Autowired
    private Mono<RSocketRequester> rSocketRequester;

    @Value("classpath:input/java_tutorial.pdf")
    private Resource resource;

    @Test
    public void uploadFile()  {

        // read input file as 4096 chunks
        Flux<DataBuffer> readFlux = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096)
                .doOnNext(s -> System.out.println("Sent"));

        // rsocket request
        this.rSocketRequester
                .map(r -> r.route("file.upload")
                        .metadata(metadataSpec -> {
                            metadataSpec.metadata("pdf", MimeType.valueOf(Constants.MIME_FILE_EXTENSION));
                            metadataSpec.metadata("output", MimeType.valueOf(Constants.MIME_FILE_NAME));
                        })
                        .data(readFlux)
                )
                .flatMapMany(r -> r.retrieveFlux(Status.class))
                .doOnNext(s -> System.out.println("Upload Status : " + s))
                .subscribe();
        
    }

}
...
...
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Sent
Upload Status : CHUNK_COMPLETED
Upload Status : COMPLETED

Back Pressure Test:

Lets simulate the slow server by slowing down the write on the server side. Lets assume each chunk takes 1 second. I added the delay element to simulate that.

public Flux<Status> uploadFile(Path path, Flux<DataBuffer> bufferFlux) throws IOException {
    Path opPath = outputPath.resolve(path);
    AsynchronousFileChannel channel = AsynchronousFileChannel.open(opPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
    return DataBufferUtils.write(bufferFlux.delayElements(Duration.ofSeconds(1)), channel)
                        .map(b -> Status.CHUNK_COMPLETED);
}

We will see a slightly different result this time. We will see 32 sent requests (chunks) first. (It is a default initial request which can be adjusted via reactor system property). As soon as client realizes that it has not received any response, it will not send any more request. Once it receives some responses, then it will send few more requests and wait for response. It will continue until the file upload process is 100% complete.

Sent
Sent
...
...
Sent
Sent
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED
Sent
Sent
...
...
Sent
...
Upload Status : CHUNK_COMPLETED
...
...
Upload Status : CHUNK_COMPLETED

Without the back pressure support, the client would have sent everything – like thousands of chunk requests and waited forever to know later that it was never uploaded!!

Summary:

We were able to successfully demonstrate RSocket File Upload with Spring Boot.

Learn more about RSocket / Reactive Programming.

The complete source code is here.

Happy learning 🙂

 

Share This:

Exit mobile version