Site icon Vinsguru

MongoDB Tailable Cursor With WebFlux

Overview:

In this tutorial, I would like to show you how to access the MongoDB Tailable Cursor or Infinite Streams with Spring Data Reactive MongoDB. This approach can help us to develop a Spring Boot application to automatically notify our users on the new records getting inserted into a collection.

MongoDB Tailable Cursor:

By default, when we query a DB, we get the results based on conditions at the time of the execution from the db and the cursor will be closed. But the tailable cursor is a cursor which remains open when we query a capped collection and notifies us any new record is arrived satisfying the condition. Capped Collection is a collection in which the max number of records for a collection is fixed. That is, if we configure the collection to have max 50 records, any new record insert will evict the oldest record. In other words, this collection always maintains the recent 50 records.

Project Setup:

Lets first create a simple spring boot project with these dependencies.

Sample Application:

We are going to develop a Netflux application which is something like Netflix. The application users would like to receive the notification for specific genres like Action, Comedy movies etc. Our application needs a feature to notify the users as and when a new movie gets added into the MongoDB.

MongoDB Setup:

I use docker-compose to set up MongoDB as shown below. (I use the admin DB. You can adjust this.)

version: "3"
services:
  mongo:
    image: mongo
    container_name: mongo
    ports:
      - 27017:27017    
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
  mongo-express:
    image: mongo-express
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: admin
      ME_CONFIG_MONGODB_ADMINPASSWORD: password

Creating Capped Collection:

docker exec -it mongo bash
mongo admin --username admin --password password
db.createCollection("movie", { capped : true, size : 5242880, max : 50 } )

Note: 

We can also create this programmatically.

this.reactiveMongoTemplate.createCollection(
    Movie.class,
    CollectionOptions.empty().capped().size(10_000_000)
            .maxDocuments(20)
);

Entity:

I have some movies list downloaded in this format.

{
    "title":"Code Name: K.O.Z.",
    "year":2015,
    "genre":"crime",
    "imdbRating":1.6
  }

So lets create an entity class for this.

@Data
@Document
@ToString
public class Movie {

    @Id
    private String id;
    private String title;
    private int year;
    private String genre;
    private double imdbRating;

}

@Tailable:

Creating MongoDB Tailable Cursor using Spring Data Reactive MongoDB Repository is so simple that you just add @Tailable. Here we can query for the specific movie genre as shown below and keep the cursor open.

@Repository
public interface MovieRepository extends ReactiveMongoRepository<Movie, String> {

    @Tailable
    Flux<Movie> findByGenre(String genre);

}

Movie Controller:

Users would be listening for the new movie updates via /movie/action REST endpoint where action is genre. It returns a Flux<Movie> via ServerSentEvents/EventStream. So that users would keep receiving updates from the Spring Boot application. When we call the findByGenre method,  the tailable cursor opens and we would keep receiving updates from MongoDB as and when a new record is inserted satisfying the condition.

@RestController
public class MovieController {

    @Autowired
    private MovieRepository repository;

    @GetMapping(value = "/movie/{genre}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Movie> getMovies(@PathVariable String genre){
        return this.repository.findByGenre(genre)
                .subscribeOn(Schedulers.boundedElastic());
    }

}

Note:

When the capped collection is empty, the flux will complete immediately! So, it is better to have at least 1 document in the capped collection. If not, we can add repeat as shown here.

@GetMapping(value = "/movie/{genre}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Movie> getMovies(@PathVariable String genre){
    return this.repository.findByGenre(genre)
            .repeatWhen(flux -> flux.delayElements(Duration.ofSeconds(1)))
            .subscribeOn(Schedulers.boundedElastic());
}

Movie Periodic Insert:

Now we need a way to periodically insert data into our collection. For that, I had a downloaded a movie list in a json array format & keep that file in the classpath with the file name as movie.json. We insert a movie document every 2 seconds.

@SpringBootApplication
@EnableReactiveMongoRepositories
public class ReactiveMoviesApplication implements CommandLineRunner {

    @Autowired
    private MovieRepository repository;

    @Value("classpath:movies.json")
    private Resource resource;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveMoviesApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        ObjectMapper objectMapper = new ObjectMapper();
        List<Movie> movieList = objectMapper.readValue(resource.getInputStream(), new TypeReference<>() {});
        Flux.fromIterable(movieList)
                .delayElements(Duration.ofSeconds(2))
                .flatMap(this.repository::save)
                .doOnComplete(() -> System.out.println("Complete"))
                .subscribe();
    }
}

Tailable Cursor –  Demo:

Now at this point, everything seems to be ready. If we start the application, the command line runner will periodically insert the movies. Launch 2 different browsers. Try to access the movies for 2 different genres. Now we can see the data being pushed to the browsers from the Spring Boot app.

Summary:

We were able to successfully demonstrate the usage of the MongoDB Tailable Cursor functionality via Spring Boot application using Reactive Spring Data.

The source code is available here.

Learn more about MongoDB + Reactive Spring Data here.

Happy learning 🙂

 

Share This:

Exit mobile version