MongoDB Change Streams With Reactive Spring Data

Overview:

In this tutorial, I would like to show you how to set up MongoDB Replica Sets using docker-compose and access MongoDB Change Streams with Reactive Spring Data MongoDB Template.

Replica Sets:

A replica set is a group of MongoDB instances that maintain the same data set. Replica sets provide redundancy and high availability.

Diagram of default routing of reads and writes to the primary.

MongoDB Change Streams:

MongoDB Change Streams allow applications to access real-time data changes; to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them.

Docker-Compose Setup:

MongoDB documentation provides clear steps to set up replication set with 3 instances. Each member of the cluster should start with the replica set name and ip address it wants to bind to as shown here.

mongod --bind_ip localhost,My-Example-Associated-Hostname

When we start all the 3 instances, they all will be 3 different instances running in the standalone mode. They do not know about other instance details to form a replica set. To do that, we have to initiate the replica set process by running few commands (with all the instances and ports details) on the mongo shell on the one of the instances. So that 1 will be primary and the other 2 would be the secondary instances. With this information, they can talk to each other now.

Even though this process sounds very simple, it is little bit tricky to do that via docker-compose. Because, 3 docker containers would be running the mongod process and we need to manually access one of the containers to initiate the replica set process.

This is where we are going to use the init-container approach. That is, we would be running 4 mongo containers to form a 3 mongo cluster. One of the containers would be acting like an init-container which would be starting the mongod in the background mode which allows this instance to use its shell to remotely start the replica set initiation process.

Lets assume that I am going to run 3 Mongodb instances with the names mongo1, mongo2 and mongo3 respectively. Then I need to initiate like this. Here vinsguru is the replica set name. It can be anything.

rs.initiate({
   "_id":"vinsguru",
   "members":[
      {
         "_id":0,
         "host":"mongo1:27017"
      },
      {
         "_id":1,
         "host":"mongo2:27017"
      },
      {
         "_id":2,
         "host":"mongo3:27017"
      }
   ]
});

Init container would run this on the start up.

#!/bin/bash

# We start mongod in the background
# then we connect remote mongoshell initiae the replica set process
# then this container exits
mongod --fork --logpath /var/log/mongod.log --replSet vinsguru --port 27017 --bind_ip_all

sleep 5

mongo --host mongo1 < rs/mongo-init.js

I have these scripts in 2 different files in the project directory as shown here.

My docker-compose file to create the mongo replica set would be,

version: "3"
services:
  mongo1:
    image: mongo
    command: --replSet vinsguru --bind_ip_all
  mongo2:
    image: mongo
    command: --replSet vinsguru --bind_ip_all
  mongo3:
    image: mongo
    command: --replSet vinsguru --bind_ip_all
  init-container:
    image: mongo
    depends_on:
      - mongo1
      - mongo2
      - mongo3
    volumes:
      - ./rs:/rs
    command: ./rs/init.sh

Now Lets see how to access the change streams & test the replica set high availability.

Project Setup:

Lets first create a simple spring boot project with these dependencies.

Sample Application:

I have a list of movies in the below path. Our application would be connecting to the MongoDB cluster and keep on inserting every movie into a collection.

src/main/resources/movies.json

As and when we insert a document into a collection, we are going to check if we could access the stream of updates/inserts. We will also suddenly stop the primary instance and see if it affects our Spring boot application. In this case, when the primary is down, one of the secondary instances should take charge and become primary.

Entity:

This is our entity class to represent the movie.

@Data
@Document
@ToString
public class Movie {

    @Id
    private String id;
    private String title;
    private int year;
    private String genre;
    private double imdbRating;

}

Reactive Spring Data – MongoDB Template:

We have successfully set up our MongoDB replica sets. Now Lets create a simple application in which we would be using reactive spring data MongoDB template to insert new documents periodically one by one.  The MongoDB Change Streams for the collection will be accessible via the Reactive Spring Data MongoDB Template.

@SpringBootApplication
@EnableReactiveMongoRepositories
public class ReactiveMoviesApplication implements CommandLineRunner {

    @Autowired
    private ReactiveMongoTemplate reactiveMongoTemplate;

    @Value("classpath:movies.json")
    private Resource resource;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveMoviesApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        
        // insert records one by one every 2 seconds
        ObjectMapper objectMapper = new ObjectMapper();
        List<Movie> movieList = objectMapper.readValue(resource.getInputStream(), new TypeReference<>() {});
        Flux.fromIterable(movieList)
                .delayElements(Duration.ofSeconds(2))
                .flatMap(this.reactiveMongoTemplate::save)
                .doOnComplete(() -> System.out.println("Complete"))
                .subscribe();

        // to access the change streams from the movie collection
        this.reactiveMongoTemplate
                .changeStream(Movie.class)
                .listen()
                .doOnNext(System.out::println)
                .blockLast(Duration.ofMinutes(1));
    }
}

Dockerfile:

My application dockerfile is as shown here. We intentionally wait for 10 seconds to start the app as we give some time for the cluster to have formed by the time app starts.

FROM openjdk:11.0.6-jre-slim

WORKDIR /usr/app

ADD target/*jar app.jar

CMD sleep 10 && java -jar app.jar

My complete docker-compose file would be as shown here.

version: "3"
services:
  mongo1:
    image: mongo
    command: --replSet vinsguru --bind_ip_all
  mongo2:
    image: mongo
    command: --replSet vinsguru --bind_ip_all
  mongo3:
    image: mongo
    command: --replSet vinsguru --bind_ip_all
  init-container:
    image: mongo
    depends_on:
      - mongo1
      - mongo2
      - mongo3
    volumes:
      - ./rs:/rs
    command: ./rs/init.sh
  app:
    build: .
    image: vinsdocker/mongo-replica-set-demo

Application Properties:

spring.data.mongodb.uri=mongodb://mongo1,mongo2,mongo3/test?replicaSet=vinsguru

Demo:

At this point, everything is ready for the demo. Run these commands one by one.

  • Package the application
mvn clean package
  • Build the docker image for the application.
docker-compose build
  • Run the application by issuing below command
docker-compose up
  • Now we could see the mongo instances to have formed a replica set. Our app will start after 10 seconds and starts inserting documents one by one. The reactive mongo template receives the Change Streams events as shown here. If you check carefully any new inserts for any genre is received here.

  • If we are interested only for the specific genre like comedy, then we could modify as shown here.
this.reactiveMongoTemplate
        .changeStream(Movie.class)
        .filter(Criteria.where("genre").is("comedy"))
        .listen()
        .doOnNext(System.out::println)
        .blockLast(Duration.ofMinutes(1));
  • While the application is running, in another terminal run this command to kill the primary mongo instance.
docker-compose stop mongo1
  • If we check the logs now, we can see that mongo1 is no longer running, but our app is still running fine with new primary instance.

Summary:

We were able to successfully demonstrate the MongoDB replica set using docker-compose and access the MongoDB change streams. We were also able to verify the high availability of our MongoDB replica sets as well.

The source code is available here.

Learn more about MongoDB + Reactive Spring Data here.

Happy learning 🙂

 

Share This:

1 thought on “MongoDB Change Streams With Reactive Spring Data

  1. Thank you for this article. But I still wonder if we somehow can combine ReactiveMongoRepository with this Change Stream to create a basic CRUD project? Can you make an example of it?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.