Discover and enable the integrations you need to solve identityAuth0 Marketplace
Apache Kafka

Spring Cloud Streams with Apache Kafka

Learn how to process streams of data from Apache Kafka topics using Spring Cloud Streams

Last Updated On: November 10, 2021

Apache Kafka

Spring Cloud Streams with Apache Kafka

Learn how to process streams of data from Apache Kafka topics using Spring Cloud Streams

Last Updated On: November 10, 2021

TL;DR: Have you ever wondered how features like Google Maps' live traffic work? These systems have to gather and process data in real-time. The architecture of these systems generally involves a data pipeline that processes and transfers data to be processed further until it reaches the clients. In this article, we will see something similar with a simple example using Kafka Streams. The sample app can be found here.

Introduction to Spring Cloud Stream

Spring Cloud Stream is a framework designed to support stream processing provided by various messaging systems like Apache Kafka, RabbitMQ, etc. The framework allows you to create processing logic without having to deal with any specific platform. It helps you build highly scalable event-driven microservices connected using these messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices. The way it works is simple; you have to provide implementations (called Binder implementations)for the messaging system that you are using. Spring cloud stream supports:

And a few others. The links above will take you to the binder implementations. In this article, we will look into a simple application that uses Kafka Streams as a stream processor listening to events on a topic, processing the data, and publishing it to the outgoing topic.

Introduction to Apache Kafka

Apache Kafka is a distributed publish-subscribe messaging system. It is a system that publishes and subscribes to a stream of records, similar to a message queue. Kafka is suitable for both offline and online message consumption. It is fault-tolerant, robust, and has a high throughput. Kafka is run as a cluster on one or more servers that can span multiple data centers. The Kafka cluster stores stream of records in categories called topics. Each record consists of a key, a value, and a timestamp. For more information on topics, Producer API, Consumer API, and event streaming, please visit this link.

Introduction to Kafka Streams

Kafka Streams is a library that can be used to consume data, process it, and produce new data, all in real-time. It works on a continuous, never-ending stream of data. Consider an example of the stock market. The stock prices fluctuate every second, and to be able to provide real-time value to the customer, you would use something like Kafka streams.

Pre-requisites:

  1. Basic knowledge of Java 11.
  2. Basic knowledge of Spring Boot.
  3. A basic understanding of Apache Kafka.
  4. Docker and Docker Compose for running Kafka locally.

Setting up Spring Boot App

Let us first create a Spring Boot project with the help of the Spring boot Initializr, and then open the project in our favorite IDE. Select Gradle project and Java language. Last but not least, select Spring boot version 2.5.4. Fill in the project metadata and click generate.

For Spring Cloud, We need to configure Spring Kafka and Kafka Streams in our gradle.build:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
}

Let's setup the config for Kafka. We need to define a few parameters on how we want to serialize and deserialize the data. The config is easy to set up and understand. Since our application will be listening to a topic and producing the output to a different topic, our application is a producer and a consumer both. So, we need to define config for both producer and consumer.

In the application.yml file, we need to add these entries.

kafka:
  bootstrap-servers: localhost:9092
  properties:
    schema.registry.url: http://localhost:8081

  producer:
    client-id: ${spring.application.name}
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

  consumer:
    client-id: ${spring.application.name}
    group-id: ${spring.application.name}-group
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    auto-offset-reset: earliest

  streams:
    client-id: ${spring.application.name}-stream
    application-id: ${spring.application.name}
    properties:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

If you look at the config carefully, we are setting up serializers and de-serializers for the producer, the consumer, and the streams (serde is just short for serializer-deserializer). This is the only setup we need for the Spring boot project.

Let's jump into creating the producer, the consumer, and the stream processor. I have taken a simple example here. We are producing random numbers every 2 seconds using a scheduler.

@Component
@AllArgsConstructor
public class NumberProducer {

    NumberPublisher numberPublisher;

    @Scheduled(fixedRate = 2000)
    public void produceIntStream() {
        Random random = new Random();
        numberPublisher.produce(random.nextInt(1000));
    }
}

The number publisher is the actual publisher that puts the data on a topic. We set a key for the message and the data (which is a random number in our case).

@Slf4j
@Component
@AllArgsConstructor
public class NumberPublisher {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void produce(Integer randomNumber) {
        String s = "Odd";
        if (randomNumber % 2 == 0) s = "Even";
        System.out.println("Produced number: " + randomNumber);
        kafkaTemplate.send(INPUT_TOPIC_NAME, s, String.valueOf(randomNumber));
    }
}

The key is defined as a String, which is either even or odd based on the number. We use the Kafka template to send the message; this comes from the spring-kafka library. It abstracts out the logic for publishing and consuming the messages.

Next up, we set up our stream processor that listens to the topic on which the publisher is putting the messages. This is where it gets interesting. We listen to the INPUT_TOPIC and then process the data. In this case, the job of the stream processor is to filter out the odd numbers and only send the even numbers on the OUTPUT_TOPIC.

@Configuration
@EnableKafkaStreams
public class KafkaStream {

    public static final String OUTPUT_TOPIC_NAME = "even-number-topic";
    public static final String INPUT_TOPIC_NAME = "number-topic";

    @Bean
    public KStream<String, String> evenNumbersStream(StreamsBuilder kStreamBuilder) {
        KStream<String, String> input = kStreamBuilder.stream(INPUT_TOPIC_NAME);

        KStream<String, String> output = input.filter((key, value) -> key.equals("Even"));

        output.to(OUTPUT_TOPIC_NAME);
        return output;
    }
}

You might be wondering about that KStream in the return type of our method. I will give a brief overview here as it is outside the scope of this article.

KStream -> A Kafka stream that is append-only. When you provide data with the same key, it will not update the previous record. It provides several operations that are very useful for data processing, like a filter, map, partition, flatMap, etc. You can read more about KStreams here.

Finally, when we have processed the data, we put it on an OUTGOING_TOPIC. For the sake of simplicity and completion, I am listening to that topic in our application. This generally will not be the case, as there would be another application that would be consuming from that topic and hence the name OUTGOING_TOPIC.

@Component
@EnableKafka
public class EvenNumberConsumer {

    @KafkaListener(topics = OUTPUT_TOPIC_NAME)
    public void receive(String value) {
        System.out.println("Received number: " + value);
    }
}

The application code is complete. Let's set up Kafka locally.

Setting up Kafka Locally

Setting up Kafka is easy, but it requires some dependency to run, you just need to use the docker-compose file below, and it will start the Kafka server locally. Add the docker compose.yml to the repository's root directory. Start the required dependency using: docker-compose up.

version: '3.7'

services:
  kafka:
    image: confluentinc/cp-kafka:5.5.0
    container_name: kafka
    hostname: kafka
    restart: always
    environment:
      KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://:29092,LISTENER_DOCKER_EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:29092,LISTENER_DOCKER_EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
      - 9999:9999
    depends_on:
      - zookeeper

  zookeeper:
    container_name: zookeeper
    hostname: zookeeper
    image: confluentinc/cp-zookeeper:5.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - 2181:2181

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    restart: always
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka:29092"
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
    ports:
      - 8081:8081
    depends_on:
      - zookeeper

That's it!

Verify Sending, Processing, and Receiving of Events

Run the Spring Boot app:

  1. Clone the sample code from the repo. Go to the root directory.
  2. Use the Gradle plugin to run your Spring Boot app using the command in the project directory.
./gradlew bootRun

Just run the application. You should see logs like this.

Received number: 910
Received number: 320
Received number: 16
Received number: 526
Received number: 76
Received number: 936
Received number: 642
Produced number: 510
Received number: 510
Produced number: 996
Received number: 996
Produced number: 897

Conclusion

Spring Cloud Stream provides a simple and convenient way to create apps that can process streams and publish data to different topics. You can build micro-services that talk to each other using Kafka messages and process data like you would process in a single application.

In this article, we have learned how to build a Spring Cloud Stream app that uses Kafka Streams. We saw how Spring Cloud Stream provides an easy way to set up and run an application that can consumer, process, and publish messages to Kafka topics without the hassle of configuring each. With such little code, we could do so much.

You can refer to the repository used in the article on Github.

  • Twitter icon
  • LinkedIn icon
  • Faceboook icon