Spring Cloud Stream Kafka Binder

Overview:

In this tutorial, I would like to show you passing messages between services using Kafka Stream with Spring Cloud Stream Kafka Binder.

Spring Cloud Stream:

Spring Cloud Stream is a framework for creating message-driven Microservices and It provides a connectivity to the message brokers. Something like Spring Data, with abstraction, we can produce/process/consume data stream with any message broker (Kafka/RabbitMQ) without much configuration.  The producer/processor/consumer is simplified using Java 8’s functional interfaces.

Application Type Java Functional Interface
Kafka Producer Supplier
Kafka Consumer Consumer
Kafka Processor Function

Normally when we use the message broker for passing the messages between 2 applications, developer is responsible for message channel creation, type conversion – serialization and deserialization etc. Spring Cloud Stream takes care of serialization and deserialization, assumes configuration, creates topics etc. So that developer can focus on the business logic and need not worry about the infrastructure.

Sample Application:

spring cloud stream kafka

  • Producer publishes number starting from 1 every second
  • Processor – just squares the given number. (If it receives 3, it will return 9)
  • Consumer just prints the number received

Kafka Set up:

Take a look at this article Kafka – Local Infrastructure Setup Using Docker Compose, set up a Kafka cluster. Once done, create 2 topics.

    • numbers (Topic 1 / Source topic)
    • squares (Topic 2 / Sink topic)

Spring Boot – Project Set up:

  • Create a simple spring boot application with below dependencies.

Spring Cloud Stream – Producer:

This method with couple of lines acts as a producer. (You can use any type T. But the return type should be Supplier<T>). I would like to publish a number every second. So I have used Flux.interval. Supplier is responsible for publishing data to a Kafka topic. (Topic configuration is part of the configuration file)

@Bean
public Supplier<Flux<Long>> producer(){
    return () -> Flux.interval(Duration.ofSeconds(1))
                        .log();
}

Spring Cloud Stream – Processor:

This function will act as a processor. It consumes the data from a Kafka topic, processes data and sends it to another topic.

@Bean
public Function<Flux<Long>, Flux<Long>> processor(){
    return longFlux -> longFlux
                            .map(i -> i * i)
                            .log();
}

Spring Cloud Stream – Consumer:

Here we consume the data from a Topic.

@Bean
public Consumer<Long> consumer(){
    return (i) -> {
        System.out.println("Consumer Received : " + i);
    };
}

If you see the above methods, they are very simple and easy to understand. We do not have to deal with Kafka libraries as they are all taken care by Spring Cloud Stream Kafka Binder.

Now, the next obvious question would be – Where is this data getting published? How does Spring Cloud Stream Kafka Binder assume the message channel name / topic names. ? This is where the auto-configuration comes into picture.

Spring Cloud Stream – Configuration:

We have created below beans. The name of the beans can be anything. It does not have to be exactly as I have shown here.

  • producer
  • processor
  • consumer

The application.yaml should be updated with the beans. Spring Cloud Stream treats them as producer or processor or consumer based on the Type (Supplier / Function / Consumer).

spring.cloud.stream:
  function:
    definition: producer;processor;consumer

Here I have semi-colon separated the names as our application has 3 beans. If your application has only one, just give 1.

By default, Spring Cloud Stream creates below topics with below names.

  • For producer
    • [producer-name]-out-0
  • For processor, 2 topics would be created. One for incoming and one for outgoing.
    • [processor-name]-in-0
    • [processor-name]-out-0
  • For consumer
    • [consumer-name]-in-0

In our case, We would like to use our custom names for the topics. Also, producer output channel should be same as processor input channel. Processor output channel should be same as Consumer’s input channel. Spring cloud  stream simplifies that by allowing us to have a configuration like this in the application.yaml.

spring.cloud.stream:
  bindings:
    producer-out-0:
      destination: numbers
    processor-in-0:
      destination: numbers
    processor-out-0:
      destination: squares
    consumer-in-0:
      destination: squares

Demo:

That’s it! We can run the application now.

If I run my application, I see the output as shown below. Spring automatically takes care of all the configuration. We are able to produce, process and consume data very quickly without much configuration using Spring Cloud Stream Kafka Binder.

2020-04-23 01:16:43.615  INFO 23389 --- [     parallel-1] reactor.Flux.Interval.2                  : onNext(7)
2020-04-23 01:16:43.622  INFO 23389 --- [container-0-C-1] reactor.Flux.Map.1                       : onNext(49)
Consumer Received : 49
2020-04-23 01:16:44.615  INFO 23389 --- [     parallel-1] reactor.Flux.Interval.2                  : onNext(8)
2020-04-23 01:16:44.622  INFO 23389 --- [container-0-C-1] reactor.Flux.Map.1                       : onNext(64)
Consumer Received : 64
2020-04-23 01:16:45.615  INFO 23389 --- [     parallel-1] reactor.Flux.Interval.2                  : onNext(9)
2020-04-23 01:16:45.620  INFO 23389 --- [container-0-C-1] reactor.Flux.Map.1                       : onNext(81)
Consumer Received : 81
2020-04-23 01:16:46.615  INFO 23389 --- [     parallel-1] reactor.Flux.Interval.2                  : onNext(10)
2020-04-23 01:16:46.621  INFO 23389 --- [container-0-C-1] reactor.Flux.Map.1                       : onNext(100)
Consumer Received : 100

Summary:

We were able to successfully demonstrate services communication with Spring Cloud Stream Kafka Binder. The services are completely decoupled and able to communicate via Kafka Stream.

Learn more about Kafka.

The source code of this demo is available here.

Happy learning 🙂

Share This:

5 thoughts on “Spring Cloud Stream Kafka Binder

  1. Hi Sir ,

    Consumer Received this statement is not printing in console , i checkout same code even i googled it about solution but not getting any solution where i can print flux

    code snippet :
    @Bean
    public Supplier<Flux> publishBookEvent() {
    return () -> Flux.just(new Book(111, “spring boot”), new Book(122, “react”));
    }

    @Bean
    public Consumer<Flux<Book>> consumeBookEvent() {

    return (books) -> {
    System.out.println("books : " + books);
    };
    }

    1. You have a consumer of flux. You need to change it to Consumer<Book>. Ifyou need to haveConsumer<Flux<Book>>, then you need to invoke the subscribe method. bookFlux -> bookFlux.subscribe(System.out::println);

      1. Thanks @vinsguru , as you mention above i also tried bookFlux.subscribe(System.out::println) but consumer statement not printing in console . when i tried to print System.out.println(bookFlux) it displaying output as FluxMap . not sure what’s going wrong with it .

        also i verified in kafka-console message getting consumed but not printing

        if i try producing Supplier and consuming with Consumer then its working but not sure what’s going wrong with Flux . any solution please .

        @SpringBootApplication
        public class SpringbootWebfluxCloudStreamApplication {

        public static void main(String[] args) {
        SpringApplication.run(SpringbootWebfluxCloudStreamApplication.class, args);
        }

        @Bean
        public Supplier<Flux<Book>> publishBookEvent() {
        return () -> Flux.just(new Book(111, "spring boot"), new Book(122, "react"))
        .log();
        }

        @Bean
        public Consumer<Flux<Book>> consumeBookEvent() {
        System.out.println("method called ...");
        return bookFlux -> bookFlux.subscribe(System.out::println);
        }

        }

        1. @vinsguru sir , below is my application.yaml
          spring:
          cloud:
          stream:
          function:
          definition: publishBookEvent;consumeBookEvent
          bindings:
          publishBookEvent-out-0:
          destination: sample-topic
          consumeBookEvent-in-0:
          destination: sample-topic

          please let me know if anything i missed , just i checkout your code there also consumer statement not printing

  2. Thanks @vinsguru , as you mention above i also tried bookFlux.subscribe(System.out::println) but consumer statement not printing in console . when i tried to print System.out.println(bookFlux) it displaying output as FluxMap . not sure what’s going wrong with it .

    also i verified in kafka-console message getting consumed but not printing

    if i try producing Supplier and consuming with Consumer then its working but not sure what’s going wrong with Flux . any solution please .

    @SpringBootApplication
    public class SpringbootWebfluxCloudStreamApplication {

    public static void main(String[] args) {
    SpringApplication.run(SpringbootWebfluxCloudStreamApplication.class, args);
    }

    @Bean
    public Supplier<Flux<Book>> publishBookEvent() {
    return () -> Flux.just(new Book(111, "spring boot"), new Book(122, "react"))
    .log();
    }

    @Bean
    public Consumer<Flux<Book>> consumeBookEvent() {
    System.out.println("method called ...");
    return bookFlux -> bookFlux.subscribe(System.out::println);
    }

    }

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.