Site icon Vinsguru

Kafka – Creating Simple Producer & Consumer Applications Using Spring Boot

Overview:

In the previous article, we had discussed the basic terminologies of Kafka and created local development infrastructure using docker-compose. In this article, I would like to show how to create a simple kafka producer and consumer using Spring-boot.

Prerequisite:

Goal:

Aim of this post is to show a way to produce some messages into a Kafka topic. So that these messages can be consumer later by a different application. Just for easy understanding, we would be producing some random numbers and write them into a Kafka topic. Create a Kafka topic called random-number with 3 partitions.

 

Kafka Producer:

spring:
  kafka:
    bootstrap-servers:
      - localhost:9091
    template:
      default-topic: random-number
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@SpringBootApplication
public class KafkaProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }

}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ThreadLocalRandom;

@Component
public class RandomNumberProducer {

    private static final int MIN = 10;
    private static final int MAX = 100_000;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedRate = 1000)
    public void produce() throws UnknownHostException {
        int random = ThreadLocalRandom.current().nextInt(MIN, MAX);
        this.kafkaTemplate.sendDefault(String.valueOf(random));
        //just for logging
        String hostName = InetAddress.getLocalHost().getHostName();
        System.out.println(String.format("%s produced %d", hostName, random));
    }

}

Kafka Consumer:

spring:
  kafka:
    bootstrap-servers:
      - localhost:9091
      - localhost:9092
      - localhost:9093
    consumer:
      group-id: random-consumer
      auto-offset-reset: earliest
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }

}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.net.UnknownHostException;

@Component
public class RandomNumberConsumer {

    @KafkaListener(topics = "random-number")
    public void consume(String message) throws UnknownHostException {
        String hostName = InetAddress.getLocalHost().getHostName();
        System.out.println(String.format("%s consumed %s", hostName, message));
    }

}

Demo:

Summary:

We were able to successfully create a simple producer and consumer applications. We also noticed that all the produced messages are consumed by the consumer applications. In the next article, we will see how we can dockerize and run multiple instances of producers and consumers.

Share This:

Exit mobile version