Site icon Vinsguru

Redis Stream With Spring Boot

Overview:

In this article, I would like to show Redis Stream With Spring Boot to demo how to implement real time stream processing.

Redis:

Redis was originally known as a REmote DIctionary Server for caching data. Along with Master/ReadReplication & Pub/Sub feature, Now Redis has added the support for Streams as well. Please take a look at some of the Redis articles if you have not read them already.

Stream Processing:

In the good old days, we used to collect data, store in a database and do nightly processing on the data. It is called batch processing!

In this Microservices era, we get continuous / never ending stream of data. Sometimes delaying this data processing might have a severe impact in our business. For example, Let’s consider an application like Netflix / YouTube. Based on the movie/videos we surf, these applications show immediate recommendations. It provides much better user experience and helps with the business. Similarly when we get all the credit card transactions, a Bank might want to check if there is any fraudulent activity and block the card immediately if it is found! Credit card provider would not want to delay this as part of nightly processing.

Stream processing is a real time continuous data processing. Lets see how we can achieve a simple real time stream processing using Redis Stream With Spring Boot.

We have also discussed real time data stream processing using Apache Kafka. Apache Kafka has been around for 10 years whereas Redis is relatively new in this field. Some of the features of the Redis Streams seem to have been inspired by Apache Kafka.  Problem with Kafka is it is very difficult to configure. The infrastructure maintenance is very challenging. But Redis is very easy and light weight.

Redis Streams vs Pub/Sub:

Pub/Sub Stream
No persistence Persistent Data
Receiver has to be up and running to receive the messages. It is a Fire & forget model Consumers do not have to be up and running. Consumers can connect to the publisher any time and consume the messages from where they left off.
All the subscribers would receive the same message Only one consumer from a consumer group will receive the message. So No duplicate message processing.

Redis Stream With Spring Boot:

Project Setup:

Create a Spring Boot project as shown below.

public enum Category {
    APPLIANCES,
    BOOKS,
    COSMETICS,
    ELECTRONICS,
    OUTDOOR;
}

I use lombok for getters and setters.

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Product {

    private String name;
    private double price;
    private Category category;

}

Redis Stream – Producer:

The Producer application will keep on publishing PurchaseEvents  periodically configured via publish.rate.

@Service
public class PurchaseEventProducer {

    private AtomicInteger atomicInteger = new AtomicInteger(0);

    @Value("${stream.key}")
    private String streamKey;

    @Autowired
    private ProductRepository repository;

    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;

    @Scheduled(fixedRateString= "${publish.rate}")
    public void publishEvent(){
        Product product = this.repository.getRandomProduct();
        ObjectRecord<String, Product> record = StreamRecords.newRecord()
                .ofObject(product)
                .withStreamKey(streamKey);
        this.redisTemplate
                .opsForStream()
                .add(record)
                .subscribe(System.out::println);
        atomicInteger.incrementAndGet();
    }

    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Events :: " + atomicInteger.get()
        );
    }

}

To keep things simple, I am not using any DB (well..except Redis) in this demo. So I have a list of products as shown here.

@Repository
public class ProductRepository {

    private static final List<Product> PRODUCTS = List.of(
            // appliances
            new Product("oven", 500.00, Category.APPLIANCES),
            new Product("dishwasher", 125.00, Category.APPLIANCES),
            new Product("heater", 65.00, Category.APPLIANCES),
            new Product("vacuum cleaner", 48.00, Category.APPLIANCES),
            new Product("refrigerator", 1200.00, Category.APPLIANCES),
            // books
            new Product("how to win friends and influence", 13.00, Category.BOOKS),
            new Product("ds and algorithms", 70.00, Category.BOOKS),
            new Product("effective java", 41.00, Category.BOOKS),
            new Product("clean architecture", 32.00, Category.BOOKS),
            new Product("microservices", 16.00, Category.BOOKS),
            // cosmetics
            new Product("brush", 9.50, Category.COSMETICS),
            new Product("face wash", 13.00, Category.COSMETICS),
            new Product("makeup mirror", 17.50, Category.COSMETICS),
            // electronics
            new Product("sony 4k tv", 999.25, Category.ELECTRONICS),
            new Product("headphone", 133.25, Category.ELECTRONICS),
            new Product("macbook", 2517.25, Category.ELECTRONICS),
            new Product("speaker", 65.25, Category.ELECTRONICS),
            // outdoor
            new Product("plants", 9.75, Category.OUTDOOR),
            new Product("power tools", 73.50, Category.OUTDOOR),
            new Product("pools", 111.75, Category.OUTDOOR)
    );

    public Product getRandomProduct(){
        int random = ThreadLocalRandom.current().nextInt(0, 20);
        return PRODUCTS.get(random);
    }

}

The application.properties contains below properties.

stream.key=purchase-events
publish.rate=1000

Redis Stream – Consumer:

Our producer is ready. Lets create a consumer. To consume Redis Streams, we need to implement the StreamListener interface.

@Service
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {

    private AtomicInteger atomicInteger = new AtomicInteger(0);

    @Autowired
    private ReactiveRedisTemplate<String, String> redisTemplate;

    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord<String, Product> record) {
        System.out.println(
                InetAddress.getLocalHost().getHostName() + " - consumed :" +
                record.getValue()
        );
        this.redisTemplate
                .opsForZSet()
                .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
                .subscribe();
        atomicInteger.incrementAndGet();
    }

    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar(){
        System.out.println(
                "Total Consumed :: " + atomicInteger.get()
        );
    }

}

Redis Stream Configuration:

Once the consumer is created, We need to create a subscription by adding the above consumer to the StreamMessageListenerContainer instance.

@Configuration
public class RedisStreamConfig {

    @Value("${stream.key:purchase-events}")
    private String streamKey;

    @Autowired
    private StreamListener<String, ObjectRecord<String, Product>> streamListener;

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        var options = StreamMessageListenerContainer
                            .StreamMessageListenerContainerOptions
                            .builder()
                            .pollTimeout(Duration.ofSeconds(1))
                            .targetType(Product.class)
                            .build();
        var listenerContainer = StreamMessageListenerContainer
                                    .create(redisConnectionFactory, options);
        var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                streamListener);
        listenerContainer.start();
        return subscription;
    }

}

Dockerizing Infrastructure:

I want to create a multiple instances of consumer to process the purchase events. So I create docker file to dockerize our app.

# Use JRE11 slim
FROM openjdk:11.0-jre-slim

# Add the app jar
ADD target/*.jar redis-stream.jar

ENTRYPOINT java -jar redis-stream.jar
version: '3'
services:
  redis:
    image: redis
    ports:
      - 6379:6379
  redis-commander:
    image: rediscommander/redis-commander:latest
    depends_on:
      - redis
    environment:
      - REDIS_HOSTS=redis:redis
    ports:
      - 8081:8081
  producer:
    build: ./redis-stream-producer
    image: vinsdocker/redis-stream-producer
    depends_on:
      - redis
    environment:
      - SPRING_REDIS_HOST=redis
      - PUBLISH_RATE=1000
  consumer:
    build: ./redis-stream-consumer
    image: vinsdocker/redis-stream-consumer
    depends_on:
      - redis
    environment:
      - SPRING_REDIS_HOST=redis

Redis Stream – Set up:

Lets bring up the redis and redis-commander instances first.

docker-compose up redis redis-commander

You can access the redis instance at port 8081 as shown here.

You can create a stream as shown here. These are all redis commands related to stream. Explore those things here.

XADD purchase-events * dummy-key dummy-value

I create a consumer-group as shown here.

XGROUP CREATE purchase-events purchase-events $

Spring Boot – Producer:

In a separate terminal, run the below command to bring up the producer.

docker-compose up producer

Once the producer has started it will start publishing events as per the given schedule.

producer_1         | 1585682873612-0
producer_1         | 1585682873812-0
producer_1         | 1585682874013-0
producer_1         | 1585682874215-0
producer_1         | 1585682874413-0
producer_1         | 1585682874613-0
producer_1         | 1585682874812-0
producer_1         | 1585682875012-0
producer_1         | Total Events :: 51

Spring Boot – Consumer:

Lets bring up 3 instances of our consumer.

docker-compose up --scale consumer=3

We could see the consumer consuming all the purchase-events. The load is distributed among all the consumers. Here consumer_2 shows that it consumed more events it is because it started first before all other consumers started.

producer_1         | 1585682887612-0
consumer_2         | 7b6c828647b0 - consumed :Product(name=how to win friends and influence, price=13.0, category=BOOKS)
producer_1         | 1585682887813-0
consumer_3         | 83699cab10bd - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
producer_1         | 1585682888012-0
consumer_1         | cdb3357593e6 - consumed :Product(name=headphone, price=133.25, category=ELECTRONICS)
producer_1         | 1585682888212-0
consumer_2         | 7b6c828647b0 - consumed :Product(name=oven, price=500.0, category=APPLIANCES)
consumer_1         | Total Consumed :: 18
consumer_2         | Total Consumed :: 84
producer_1         | 1585682888412-0
consumer_3         | 83699cab10bd - consumed :Product(name=makeup mirror, price=17.5, category=COSMETICS)
producer_1         | 1585682888612-0
consumer_1         | cdb3357593e6 - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
consumer_3         | Total Consumed :: 16

Access the redis-commander and look for the sorted set – revenue. We could see the products revenue by the category for the purchases happening real time.

Summary:

We were able to successfully implement real time stream processing by using Redis Steam With Spring Boot.

The complete source code is here.

Happy learning 🙂

 

Share This:

Exit mobile version