Scatter Gather Pattern

Overview

In this tutorial, I would like to demonstrate Scatter Gather Pattern which is one of the Enterprise Integration Patterns for the distributed system architecture using NATS messaging server.

Please check my previous artcile here to know more about NATS.

Scatter Gather Pattern

Let’s consider an application in which we need to do a set of tasks to complete the business workflow. If these tasks do not depend on each other, then it does not make sense to do them sequentially. We can do these tasks in parallel.

scatter gather pattern
Scatter Gather Pattern helps us to distribute these tasks to achieve parallel processing of tasks/messages/events & finally aggregate the responses as a single response as shown above.

Sample Application

Let’s consider a flight booking application in which user searches for flight deals. The application sends the information to all the airlines, find their fares and then responds back.

scatter gather pattern

As our application depends on 3rd party APIs and we need to provide best user experience to our user, we will publish the user request to all airlines and whichever responds within specific timeout period, we will collect all results and show the top 5 deals to our users.

The main application does not even know how many airlines are listening to the requests. Even if some of the airlines services are not be up and running, it is not going to affect our flight-app.

NATS Server

Please ensure that NATS server is up and running.  We can easily spin up NATS by using docker.

docker run -p 4222:4222 nats:alpine

Project Setup

Create a Spring Boot application with below dependencies.

It will be a multi-module maven project as shown here.

Our project depends on super-fast NATS messaging server. So add this dependency as well.

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.6.8</version>
</dependency>

Common DTO

  • Flight Search Request
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class FlightSearchRequest {

    private String from;
    private String to;

}
  • Flight Schedule
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class FlightSchedule {

    private String date;
    private int price;
    private String airline;

}
  • Flight Search Response
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class FlightSearchResponse {

    private FlightSearchRequest searchRequest;
    private List<FlightSchedule> schedules;

}

Airline – Service

This service class represents the individual airlines. It receives the request and provide the schedules along with price.

  • This is a separate app. We would be running multiple instances of this app.
public class AirlineService {

    private static final String AIRLINE = Objects.toString(System.getenv("AIRLINE_NAME"), "UNKNOWN");
    private static final String NATS_SERVER = Objects.toString(System.getenv("NATS_SERVER"), "nats://localhost:4222");

    public static void main(String[] args) throws IOException, InterruptedException {
        final Connection nats = Nats.connect(NATS_SERVER);
        final Dispatcher dispatcher = nats.createDispatcher(msg -> {});
        dispatcher.subscribe("flight.search", (msg) -> {
            ObjectUtil.toObject(msg.getData(), FlightSearchRequest.class)
                   .ifPresent(searchRequest -> {
                       List<FlightSchedule> flightSchedules = getFlightSchedules(searchRequest);
                       nats.publish(msg.getReplyTo(), ObjectUtil.toBytes(flightSchedules));
                   });
        });
    }

    private static List<FlightSchedule> getFlightSchedules(FlightSearchRequest searchRequest){
        // input parameter is not used
        int randomNoResponse = ThreadLocalRandom.current().nextInt(0, 3);
        return IntStream.rangeClosed(0, randomNoResponse)
                .mapToObj(i -> getRandomSchedule())
                .collect(Collectors.toList());
    }

    private static FlightSchedule getRandomSchedule(){
        int randomDate = ThreadLocalRandom.current().nextInt(0,  30);
        int randomPrice = ThreadLocalRandom.current().nextInt(50, 500);
        var date = LocalDate.now().plusDays(randomDate);
        return FlightSchedule.of(date.toString(), randomPrice, AIRLINE);
    }

}

Scatter Gather Pattern

Now lets work on the flight-search customer facing app.

  • NATS bean
@Bean
public Connection nats(@Value("${nats.server}") String natsServer) {
    return Nats.connect(natsServer);
}
  • Controller
@RestController
@RequestMapping("flight")
public class FlightSearchController {

    @Autowired
    private BroadcastService service;

    @GetMapping("/{from}/{to}")
    public Mono<FlightSearchResponse> search(@PathVariable String from, @PathVariable String to){
        return this.service.broadcast(FlightSearchRequest.of(from, to));
    }

}
  • ScatterGatherService
    • This class is responsible for broadcasting the request and receiving the responses
@Service
public class ScatterGatherService {

    @Autowired
    private Connection nats;

    public Mono<FlightSearchResponse> broadcast(FlightSearchRequest flightSearchRequest){
        // create inbox
        String inbox = nats.createInbox();
        Subscription subscription = nats.subscribe(inbox);
        return Flux.generate((SynchronousSink<FlightSchedule[]> fluxSink) -> receiveSchedules(fluxSink, subscription))
                    .flatMap(Flux::fromArray)
                    .bufferTimeout(5, Duration.ofSeconds(1))
                    .map(list -> {
                        list.sort(Comparator.comparing(FlightSchedule::getPrice));
                        return list;
                    })
                    .map(list -> FlightSearchResponse.of(flightSearchRequest, list))
                    .next()
                    .doFirst(() -> nats.publish("flight.search", inbox, ObjectUtil.toBytes(flightSearchRequest)))
                    .doOnNext(i -> subscription.unsubscribe());
    }

    private void receiveSchedules(SynchronousSink<FlightSchedule[]> synchronousSink, Subscription subscription){
        try{
            Message message = subscription.nextMessage(Duration.ofSeconds(1));
            ObjectUtil.toObject(message.getData(), FlightSchedule[].class).ifPresent(synchronousSink::next);
        }catch (Exception e){
            synchronousSink.error(e);
        }
    }

}

Scatter Gather Pattern – Demo

  • I send a request
http://localhost:8080/flight/Houston/LasVegas
  • I receive a response as shown here
{
   "searchRequest":{
      "from":"Houston",
      "to":"LasVegas"
   },
   "schedules":[
      {
         "date":"2021-01-02",
         "price":72,
         "airline":"DELTA"
      },
      {
         "date":"2020-12-28",
         "price":87,
         "airline":"UNITED_AIRLINE"
      },
      {
         "date":"2021-01-02",
         "price":109,
         "airline":"FRONTIER"
      },
      {
         "date":"2021-01-08",
         "price":229,
         "airline":"UNITED_AIRLINE"
      },
      {
         "date":"2021-01-02",
         "price":408,
         "airline":"DELTA"
      }
   ]
}

Summary

We were able to successfully demonstrate the use of Scatter Gather Pattern in our Microservices Architecture to efficiently process tasks in parallel and aggregate the results finally.

Read more Microservice Design Patterns.

The source code is available here.

Happy learning 🙂

Share This:

2 thoughts on “Scatter Gather Pattern

  1. Hi Vinoth,
    It seems instructions are not clear on how to start the Nets server. When I run the airlines app, I get below

    Exception in thread “main” java.io.IOException: Unable to connect to NATS servers: nats://localhost:4222.
    at io.nats.client.impl.NatsConnection.connect(NatsConnection.java:239)
    at io.nats.client.impl.NatsImpl.createConnection(NatsImpl.java:29)
    at io.nats.client.Nats.createConnection(Nats.java:249)
    at io.nats.client.Nats.connect(Nats.java:131)
    at com.example.service.AirlineService.main(AirlineService.java:26)

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.