4 분 소요


In the world of big data, a reliable streaming platform is essential. Let’s work on sending and receiving simple event messages using the currently most popular Kafka streaming platform with SpringBoot.

Installing Kafka

Obviously, to use Kafka, you either need to use a server where Kafka is already installed or install it yourself.

I’m going to install Kafka in my local development environment using Docker.

If you’re using Kafka installed elsewhere, you just need to change the Kafka address used today.

Quick Concept Overview

Kafka Broker

  • A single Kafka cluster consists of brokers
  • Responsible for handling producers and consumers and maintaining replicated data in the cluster

Kafka Topic

  • A category where records are published
  • The subject of Kafka messages

Kafka Producer

  • An application written to send data to Kafka
  • Data producer

Kafka Consumer

  • A program written to retrieve data from Kafka
  • Data consumer

Zookeeper

  • Used to manage the Kafka cluster, track node status, and maintain a list of topics and messages

Dockerfile

I’m going to install zookeeper and kafka.

Instead of writing a separate Dockerfile, I’ll use docker-compose.yml.

#docker-compose.yml - kafka/zookeper
version: '3'

services:
    zookeeper:
    image: arm64v8/zookeeper
    container_name: zookeeper
    ports:
        - "2181:2181"
    kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
        - "9092:9092"
    environment:
        KAFKA_ADVERTISED_HOST_NAME: localhost
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Since zookeeper will use port 2181 and kafka will use port 9092, you should check if these ports are currently in use.

Use the docker ps command to verify that these processes are running properly.

Working with Kafka - Connecting, Creating Topics

Connecting to Kafka

Once the Zookeeper and Kafka containers are running, connect to Kafka.

docker exec -it kafka /bin/sh

Creating a Kafka Topic

The Kafka scripts inside the Kafka container are located in the bin folder under the kafka_ folder in opt.

In my case, I worked in opt/kafka_2.13-2.81/bin (the Kafka version may differ depending on when you installed it or the image). Navigate to /opt/kafka_2.13-2.81/bin and create a topic with the following command.

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic wool_kafka_topic

You can verify the topic creation with the following command.

kafka-topics.sh --list --zookeeper zookeeper:2181

Now that we’ve completed the simple Kafka setup using Docker, let’s work on connecting it with Spring.

I’ve previously posted about simple topic-related commands and how to handle them with Python.

Reference Post

Installing Spring Dependencies

Remember whether you started SpringBoot with Gradle or Maven, and install it using the appropriate method for your environment.

Alternatively, when starting SpringBoot in IntelliJ, you can select Web and Kafka in Spring Starter to install them.

Gradle -> build.gradle

// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka:2.8.5'

maven -> pom.xml

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.5</version>
</dependency>

Creating Spring Applications - Producer, Consumer

Producer and Consumer Applications written in Python

While you can write both Producer and Consumer in a single SpringBoot Application, I’m going to work on sending and receiving data between different SpringBoot applications.

Producer Application - config

I’ll create a SpringBoot application and set up basic server information in application.yml (delete the pre-configured application.properties).

spring:
    kafka:
    producer:
        bootstrap-servers: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer Application - Controller

package com.example.producerapplication.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
public class ProduceController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/publish")
    public String publish() {
        int leftLimit = 48; // numeral '0'
        int rightLimit = 122; // letter 'z'
        int targetStringLength = 10;
        Random random = new Random();

        String generatedString = random.ints(leftLimit, rightLimit + 1)
                .filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
                .limit(targetStringLength)
                .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
                .toString();

        this.kafkaTemplate.send("wool_kafka_topic", generatedString);

        return "success";
    }
}
  • Using REST Controller
  • Setting up KafkaTemplate to send data to Kafka
  • Setting up the publish endpoint to send from SpringBoot to Kafka. In my case, instead of just sending, I used StringBuilder to send a random string.

Consumer Application - config

For the consumer application, I added settings for basic Kafka information plus a different server port.

spring:
    kafka:
    consumer:
        bootstrap-servers: localhost:9092
        group-id: my_group
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
    port: 8081
  • auto-offset-reset: ‘earliest’ means the consumer reads from the earliest event
  • key-deserializer and value-deserializer are responsible for deserializing the key and value sent by the Kafka producer

Consumer Application - Service

package com.example.consumerapplication.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class ConsumerService {

    @KafkaListener(topics = "wool_kafka_topic")
    public void receive(String message) {

        System.out.println(message);
    }

}
  • No Controller was written. While you can create data sending functionality in the Consumer Application, this example focuses on receiving data.
  • I’m creating ConsumerService in the service package and applying annotations to set up the Kafka listener.
  • The information used here is parsed from application.yml by SpringBoot and the Kafka library.

Execution

  • Start the SpringBoot Application acting as Producer (localhost:8080)
  • Similarly, start the Consumer SpringBoot Application (localhost:8081)
  • Call the /publish URL endpoint from the Producer
    • At this point, the randomly generated string is sent to Kafka
  • Check the Consumer Application console to see if the random string sent from Kafka is received

댓글남기기