reactor hot publisher

Reactor Flux File Reading

Overview:

In this tutorial, I would like to quickly show you Reactor Flux File Reading example. How we could access a file, read the content stream and close the file etc with Project Reactor – to properly consume and cleaning up resources.

We have been discussing reactive programming in this blog for a while. There are few articles already in this blog. Please check them out!

Java – try-with-resources:

If you need to a read a file in Java, We first need to access the file by using FileReader object, read the content as bytes/string and then close the file once you are done. This is same as reading records from a DB. Open a connection, execute the query and then close the connection.

The point here is people often forget the very important step of closing the resources once they are consumed (file or DB connection) which are difficult to find immediately and but later leads to a lot of issues when the application is in production. Sometimes some responsible developers would close the file once they read the content as shown here. However, java still might not be able to close the file in case of some exceptions where we immediately jump into the catch block.

try{
    BufferedReader br = Files.newBufferedReader(Paths.get("/some/large-file.txt")));
    br.readLine(); // some exception here
    br.close();    // this will not execute
}catch (Exception e){
    e.printStackTrace();
}

This is when Java introduced try-with-resources concept A try block with (….) from version 7.

try(BufferedReader br = Files.newBufferedReader(Paths.get("/some/large-file.txt"))){
    br.readLine();   // read file and do what you need to do.
}catch (Exception e){
    e.printStackTrace();
}
// file would have been closed here.

Here the assumption is any resource you are trying to create within the try () should have implemented the AutoCloseable interface. So that java could close that for us when we exit the try block. It works even in case of some exceptions within the try block.

Now lets discuss how we could consume resources like this in the reactive programming and clean up the resources.

Flux/Mono – Using:

Project reactor provides a method called ‘using‘ via Flux/Mono for this purpose. It is very generic and it is completely up to us how we want to create resource, use and clean/close at the end.

Flux.using(
    resourceSupplier, 
    (resource) -> return Publisher,
    (resource) -> clean this up
)

Before we discuss about files, Lets take a very simple example. Lets consider a List as a resource and clearing all the items as cleaning-up the resource. In the below example, I am intentionally using new ArrayList is because Arrays.asList is unmodifiable and calling clear method would throw exception.

// resource
List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3));

// creating a flux with resources
Flux<Integer> flux = Flux.using(
        () -> list,
        (l) -> Flux.fromIterable(l),
        (l) -> l.clear()
);

// print the size of the resource
System.out.println("Before : " + list.size());

// consume the resource
flux.subscribe(System.out::println);

// just wait for resource consumption
Thread.sleep(3000); 

// print the size of the resource
System.out.println("After : " + list.size());

Output:

Before : 3
1
2
3
After : 0

If the above example gives you some idea, then lets take a look at the file reader example. Why it is important to use ‘using‘ method is because, in the reactive programming everything is asynchronous.

Lets consider this example. We have a 1 GB file. Publisher reads the file & emits each line one by one via a thread. A subscriber could be subscribing to this publisher via a different thread in some other class. Also the subscriber might be interested only in some first few MB data. After that the subscriber could cancel the subscription. In that case, publisher should not keep on emitting data. The publisher can safely close the file unless some other subscriber asks for it.

In Java, with try with resources, opening and closing file happens in a single block. In the reactive programming world, publisher and subscribers could be in different classes. So we would not know when to close the file. Sometimes it could be challenging to debug such issues. So it is great that project reactor has considered this and provided a method for consuming resources safely.

Reactor Flux File Reading:

I have created a 1 GB txt file with 35 million lines for this purpose. Lets assume that a Publisher reads this file and emits data line by line. A subscriber consumes the data, might process the content (lowercase to uppercase etc) and writes the data into a different file. At the end, we would be seeing an additional 1 GB file.

 

reactor flux file reading

  • Resource handlers:
// private methods to handle checked exceptions

private void close(Closeable closeable){
    try {
        closeable.close();
        System.out.println("Closed the resource");
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

private void write(BufferedWriter bw, String string){
    try {
        bw.write(string);
        bw.newLine();
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}
  • Flux creation
// input file
Path ipPath = Paths.get("/some/path/large-input-file.txt");

Flux<String> stringFlux = Flux.using(
        () -> Files.lines(ipPath),
        Flux::fromStream,
        Stream::close
);
  • Subscriber
// output file
Path opPath = Paths.get("/some/path/large-output-file.txt");
BufferedWriter bw = Files.newBufferedWriter(opPath, StandardOpenOption.CREATE, StandardOpenOption.APPEND);

stringFlux
        .subscribe(s -> write(bw, s),
        (e) -> close(bw),  // close file if error / oncomplete
        () -> close(bw)
);

When I run the above code, new File creation with the content pushed by the publisher works perfectly. Files are closed as soon as file copy is complete.

Reactor Flux File Reading With Multiple Subscribers:

Our use case could be different that there could be multiple subscribers interested in consuming the data, do their own processing and save the file.  We do not want to read the file 3 times for each subscriber. The file read could be just 1 time activity by creating a hot publisher as shown here.

Do note that File reading/writing is a lot IO intensive activity. So, if you simply create the 3 subscribers as shown below and connect to the publisher it will not work. Only the first file will be created successfully. Second or third files will not be created because the main thread would be busy in creating the first file and by the time, the second subscriber starts consuming the data, publisher would have already emitted all the data!

reactor flux file read

This is when we need to use publishOn/subscribeOn to do this activity via different threads.

Flux<String> stringFlux = Flux.using(
        () -> Files.lines(ipPath),
        Flux::fromStream,
        Stream::close
)
 .subscribeOn(Schedulers.newParallel("file-copy", 3))
 .share();
  • subscribeOn makes thread access data via different threads
  • share method creates a hot publisher. Otherwise it would be a cold publisher. Cold publisher would read the file for each subscriber – that would be 3 times opening and reading the same file and unnecessary wasting of resources.

After making this simple code change, Now we can connect 3 subscribers. Each would be getting the file data from the publisher and they can do whatever they want to do with this at the same time.

Summary:

We were able to successfully demonstrate Reactor Flux File Reading with a simple and powerful using method available in the Flux / Mono. Properly consuming resources, closing connections etc are often overlooked and has always caused issues in Production which are very difficult to DEBUG. It is great to see the Project Reactor providing methods for resource consumption safely.

Learn more about Reactor / Reactive Programming.

Happy learning 🙂

 

Share This:

5 thoughts on “Reactor Flux File Reading

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.