MongoDB Tailable Cursor With WebFlux

Overview:

In this tutorial, I would like to show you how to access the MongoDB Tailable Cursor or Infinite Streams with Spring Data Reactive MongoDB. This approach can help us to develop a Spring Boot application to automatically notify our users on the new records getting inserted into a collection.

MongoDB Tailable Cursor:

By default, when we query a DB, we get the results based on conditions at the time of the execution from the db and the cursor will be closed. But the tailable cursor is a cursor which remains open when we query a capped collection and notifies us any new record is arrived satisfying the condition. Capped Collection is a collection in which the max number of records for a collection is fixed. That is, if we configure the collection to have max 50 records, any new record insert will evict the oldest record. In other words, this collection always maintains the recent 50 records.

Project Setup:

Lets first create a simple spring boot project with these dependencies.

spring data mongodb reactive

Sample Application:

We are going to develop a Netflux application which is something like Netflix. The application users would like to receive the notification for specific genres like Action, Comedy movies etc. Our application needs a feature to notify the users as and when a new movie gets added into the MongoDB.

MongoDB Setup:

I use docker-compose to set up MongoDB as shown below. (I use the admin DB. You can adjust this.)

version: "3"
services:
  mongo:
    image: mongo
    container_name: mongo
    ports:
      - 27017:27017    
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
  mongo-express:
    image: mongo-express
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: admin
      ME_CONFIG_MONGODB_ADMINPASSWORD: password

Creating Capped Collection:

  • Enter into the mongo container
docker exec -it mongo bash
  • Access mongo shell for the ‘admin’ database.
mongo admin --username admin --password password
  • Create a capped collection called movie. Here the size is max number of bytes for the collection documents which is a mandatory field. Max is 50 documents – it is optional.
db.createCollection("movie", { capped : true, size : 5242880, max : 50 } )

Note: 

We can also create this programmatically.

this.reactiveMongoTemplate.createCollection(
    Movie.class,
    CollectionOptions.empty().capped().size(10_000_000)
            .maxDocuments(20)
);

Entity:

I have some movies list downloaded in this format.

{
    "title":"Code Name: K.O.Z.",
    "year":2015,
    "genre":"crime",
    "imdbRating":1.6
  }

So lets create an entity class for this.

@Data
@Document
@ToString
public class Movie {

    @Id
    private String id;
    private String title;
    private int year;
    private String genre;
    private double imdbRating;

}

@Tailable:

Creating MongoDB Tailable Cursor using Spring Data Reactive MongoDB Repository is so simple that you just add @Tailable. Here we can query for the specific movie genre as shown below and keep the cursor open.

@Repository
public interface MovieRepository extends ReactiveMongoRepository<Movie, String> {

    @Tailable
    Flux<Movie> findByGenre(String genre);

}

Movie Controller:

Users would be listening for the new movie updates via /movie/action REST endpoint where action is genre. It returns a Flux<Movie> via ServerSentEvents/EventStream. So that users would keep receiving updates from the Spring Boot application. When we call the findByGenre method,  the tailable cursor opens and we would keep receiving updates from MongoDB as and when a new record is inserted satisfying the condition.

@RestController
public class MovieController {

    @Autowired
    private MovieRepository repository;

    @GetMapping(value = "/movie/{genre}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Movie> getMovies(@PathVariable String genre){
        return this.repository.findByGenre(genre)
                .subscribeOn(Schedulers.boundedElastic());
    }

}

Note:

When the capped collection is empty, the flux will complete immediately! So, it is better to have at least 1 document in the capped collection. If not, we can add repeat as shown here.

@GetMapping(value = "/movie/{genre}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Movie> getMovies(@PathVariable String genre){
    return this.repository.findByGenre(genre)
            .repeatWhen(flux -> flux.delayElements(Duration.ofSeconds(1)))
            .subscribeOn(Schedulers.boundedElastic());
}

Movie Periodic Insert:

Now we need a way to periodically insert data into our collection. For that, I had a downloaded a movie list in a json array format & keep that file in the classpath with the file name as movie.json. We insert a movie document every 2 seconds.

@SpringBootApplication
@EnableReactiveMongoRepositories
public class ReactiveMoviesApplication implements CommandLineRunner {

    @Autowired
    private MovieRepository repository;

    @Value("classpath:movies.json")
    private Resource resource;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveMoviesApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        ObjectMapper objectMapper = new ObjectMapper();
        List<Movie> movieList = objectMapper.readValue(resource.getInputStream(), new TypeReference<>() {});
        Flux.fromIterable(movieList)
                .delayElements(Duration.ofSeconds(2))
                .flatMap(this.repository::save)
                .doOnComplete(() -> System.out.println("Complete"))
                .subscribe();
    }
}

Tailable Cursor –  Demo:

Now at this point, everything seems to be ready. If we start the application, the command line runner will periodically insert the movies. Launch 2 different browsers. Try to access the movies for 2 different genres. Now we can see the data being pushed to the browsers from the Spring Boot app.

Summary:

We were able to successfully demonstrate the usage of the MongoDB Tailable Cursor functionality via Spring Boot application using Reactive Spring Data.

The source code is available here.

Learn more about MongoDB + Reactive Spring Data here.

Happy learning 🙂

 

Share This:

9 thoughts on “MongoDB Tailable Cursor With WebFlux

  1. Vins,
    I was desperately looking for a working and real example with a db and not just a data generator!

    Your Article is Great: short, concise with working and useful example!

    A BIG THANK YOU!

    Not only it comes with a reactive db but it shows 1) how to use a dockerized mongodb and 2) how to keep the connection open, a problem I was struggling with for a while.

    I’ve added a simple index.html file to display the movies stream and it did work perfectly.

    Thank you VERY MUCH!

  2. Hi Vins, on the getMovies method, is it possible to execute an update query after the return are sent back to the user to avoid sending the same results again?
    Basically, I’d like to send the movies once: a “sent” boolean property to the movie entity.
    I tried to update the sent movies using flatMap with repo.save call after the repeatWhen and subscribeOn but that didn’t work…
    Thanks!

    1. In this specific scenario, when you say update with ‘sent’ – you might have sent to one user. Might not have sent to other users, rt?. I feel like such relationships should be a separate collection.

      1. Hi Vins, thanks for your response.
        Actually, I’m using your example as a reference to create an application to notify subscribed users. If a notification is sent back to user, then I’d like to flag it so that it’ll not be sent again, next time this user will reconnect. Thanks!

        1. Vins, I tried a different alternative by defining an acknowledgment method that’s called after the user receives the notification.
          This method sets the notification’s ‘sent’ property to true so that it’ll not be sent again.
          However, this didn’t work because mongodb complained with this error “Cannot change the size of a document in a capped collection “.
          Apparently, the db doesn’t like updating capped collections…

          1. All sorted out now by using “fixed” structure (the document property values should always have the same size…).
            Thanks!

  3. Hello, Good tutorial.
    Question:
    How can i do a kind of bind with mongo, i mean, how to know when data was added or deleted, like i do it in firebase when i ibind collections with de database.

    Sorry by mi english.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.