MongoDB Aggregation Pipeline With Reactive Spring Data

Overview:

In this tutorial, I would like to show how to use the MongoDB Aggregation Pipeline to process multiple documents and present the data in the desired format with Reactive Spring Data MongoDB Repository and MongoDB Template.

Project Setup:

Lets first create a simple spring boot project with these dependencies.

Sample Application:

Let’s consider an application in which freelancers register themselves with their skill sets. The information is stored in the MongoDB as shown below.

{
    "name": "sam",
    "age": 40,
    "skills": [ "js", "react", "python"]
}

{
    "name": "jack",
    "age": 38,
    "skills": [ "js", "angular", "postgres"]
}

{
    "name": "james",
    "age": 30,
    "skills": [ "java", "reactor", "mongo"]
}

{
    "name": "smith",
    "age": 32,
    "skills": [ "qa", "selenium"]
}

Users of the application would like to see for each skill set/technology, all the available freelancers. So the UI expectation is more or less like this.

{
   "js":[
      "sam",
      "jack"
   ],
   "java":[
      "james"
   ],
   "selenium":[
      "smith"
   ]
...
...
}

Let’s see how to achieve this using Spring Data.

MongoDB Setup:

I use docker-compose to set up MongoDB and insert above documents.

version: "3"
services:
  mongo:
    image: mongo
    ports:
      - 27017:27017    
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
  mongo-express:
    image: mongo-express
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: admin
      ME_CONFIG_MONGODB_ADMINPASSWORD: password

MongoDB Aggregation Pipeline:

MongoDB Aggregation Pipeline framework allows us to group multiple documents, process information and present in desired format. For our requirement, We need to have following stages in our aggregation pipeline.

  • unwind:  Each freelancer has an array of skill sets. It is stored in the skills field. This stage reconstructs the document in a way that a person with 3 skills would be presented as 3 persons with 1 skill each.
{ 
            "$unwind" : { 
                "path" : "$skills"
            }
 }
  • group: Once you have unwinded (!), now we could group by skills field. For each skill, extract the names and push it to an array!
{
   "$group":{
      "_id":"$skills",
      "names":{
         "$push":"$name"
      }
   }
}
  • project: Once we have grouped the records, then we need to look for the specific fields we are interested in. I exclude the _id and retrieve as skill, then I also need the names field.
{
   "$project":{
      "_id":0,
      "skill":"$_id",
      "names":1
   }
}

Now let’s get this implemented.

Entity Class:

Our entity class is represented as shown below.

@Data
@Document
@ToString
public class Freelancer {

    @Id
    private String id;
    private String name;
    private int age;
    private List<String> skills;

}

Projection Class:

Since we would like to present the data differently, Lets create a model class for that.

@Data
@ToString
public class SkilledPeople {

    private String skill;
    private List<String> names;

}

@Aggregation:

Reactive Spring Data MongoDB Repository:

Once we have our stages ready, we include them as a aggregation pipeline stages as the value for the @Aggregation. Do note that it returns the instances of projection class we have defined above.

@Repository
public interface FreelancerRepository extends ReactiveMongoRepository<Freelancer, String> {

    @Aggregation({
            "{ $unwind : { path : $skills } }",
            "{ $group : { _id : $skills, names : { $push : $name } } }",
            "{ $project : { _id : 0, skill : $_id, names : 1 } }"
    })
    Flux<SkilledPeople> getSkilledPeople();

}

Application Properties:

spring.data.mongodb.database=admin
spring.data.mongodb.username=admin
spring.data.mongodb.password=password

Demo:

At this point, everything seems to be ready! We can run and verify.

@SpringBootApplication
@EnableReactiveMongoRepositories
public class ReactiveMongoApplication implements CommandLineRunner {

    @Autowired
    private FreelancerRepository repository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveMongoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        this.repository
                .getSkilledPeople()
                .subscribe(System.out::println);
    }
}

Output:

We get the list of skills and corresponding freelancers who can be hired.

SkilledPeople(skill=js, names=[sam, jack])
SkilledPeople(skill=angular, names=[jack])
SkilledPeople(skill=postgres, names=[jack])
SkilledPeople(skill=selenium, names=[smith])
SkilledPeople(skill=java, names=[james])
SkilledPeople(skill=mongo, names=[james])
SkilledPeople(skill=react, names=[sam])
SkilledPeople(skill=python, names=[sam])
SkilledPeople(skill=reactor, names=[james])
SkilledPeople(skill=qa, names=[smith])

Aggregation With Template:

Reactive Spring Data MongoDB Template:

Spring Data provides the support for the MongoDB Aggregation Pipeline framework via Mongo Template as well. We can create all the stages as shown here and get it executed.

@Service
public class FreelancerService {

    @Autowired
    private ReactiveMongoTemplate reactiveMongoTemplate;

    public Flux<SkilledPeople> getSkilledPeople(){

        // unwind
        UnwindOperation unwindOperation = Aggregation.unwind("skills");
        // group
        GroupOperation groupOperation = Aggregation.group("skills").push("name").as("names");
        // project
        ProjectionOperation projectionOperation = Aggregation.project("names").and("_id").as("skill");

        // add all the stages
        TypedAggregation<Freelancer> skillTypedAggregation = Aggregation.newAggregation(
                Freelancer.class,
                unwindOperation,
                groupOperation,
                projectionOperation
        );

        // execute
        return this.reactiveMongoTemplate
                .aggregate(
                        skillTypedAggregation,
                        SkilledPeople.class
                );


    }

}

Demo:

We would get the same output, If we modify the runner as shown below.

@SpringBootApplication
@EnableReactiveMongoRepositories
public class ReactiveMongoApplication implements CommandLineRunner {

    @Autowired
    private FreelancerService service;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveMongoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        this.service
                .getSkilledPeople()
                .subscribe(System.out::println);
    }
    
}

Summary:

We were able to successfully demonstrate the MongoDB Aggregation Pipeline using both Reactive Spring Data MongoDB Repository & Template.

The source code is available here.

Learn more about MongoDB + Reactive Spring Data here.

Happy learning 🙂

Share This:

3 thoughts on “MongoDB Aggregation Pipeline With Reactive Spring Data

  1. Hi Vins,
    Thank you for great article, I would say: like always.

    I cloned the project, I tried it and I think there is an issue with CommandLineRunner:
    // KO
    this.repository
    .getSkilledPeople()
    .subscribe(System.out::println);

    // OK
    this.repository
    .getSkilledPeople()
    .blockLast();

    This behavior is logique, if you don’t block, the application will shutdown immediately.
    I’m waiting for feedback
    Best regards

    1. I am really sorry for the late response. I have updated this project.
      Simply use docker-compose up. this will initialize all the data. Then run the app.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.