SpringBoot에서 Kafka Event Message 보내고 받기 Sending and Receiving Kafka Event Messages in SpringBoot
In the world of big data, a reliable streaming platform is essential. Let’s work on sending and receiving simple event messages using the currently most popular Kafka streaming platform with SpringBoot.
Installing Kafka
Obviously, to use Kafka, you either need to use a server where Kafka is already installed or install it yourself.
I’m going to install Kafka in my local development environment using Docker.
If you’re using Kafka installed elsewhere, you just need to change the Kafka address used today.
Quick Concept Overview
Kafka Broker
- A single Kafka cluster consists of brokers
- Responsible for handling producers and consumers and maintaining replicated data in the cluster
Kafka Topic
- A category where records are published
- The subject of Kafka messages
Kafka Producer
- An application written to send data to Kafka
- Data producer
Kafka Consumer
- A program written to retrieve data from Kafka
- Data consumer
Zookeeper
- Used to manage the Kafka cluster, track node status, and maintain a list of topics and messages
Dockerfile
I’m going to install zookeeper and kafka.
Instead of writing a separate Dockerfile, I’ll use docker-compose.yml.
#docker-compose.yml - kafka/zookeper
version: '3'
services:
zookeeper:
image: arm64v8/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Since zookeeper will use port 2181 and kafka will use port 9092, you should check if these ports are currently in use.
Use the docker ps command to verify that these processes are running properly.
Working with Kafka - Connecting, Creating Topics
Connecting to Kafka
Once the Zookeeper and Kafka containers are running, connect to Kafka.
docker exec -it kafka /bin/sh
Creating a Kafka Topic
The Kafka scripts inside the Kafka container are located in the bin folder under the kafka_
In my case, I worked in opt/kafka_2.13-2.81/bin (the Kafka version may differ depending on when you installed it or the image).
Navigate to /opt/kafka_2.13-2.81/bin and create a topic with the following command.
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic wool_kafka_topic
You can verify the topic creation with the following command.
kafka-topics.sh --list --zookeeper zookeeper:2181
Now that we’ve completed the simple Kafka setup using Docker, let’s work on connecting it with Spring.
I’ve previously posted about simple topic-related commands and how to handle them with Python.
Installing Spring Dependencies
Remember whether you started SpringBoot with Gradle or Maven, and install it using the appropriate method for your environment.
Alternatively, when starting SpringBoot in IntelliJ, you can select Web and Kafka in Spring Starter to install them.
Gradle -> build.gradle
// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka:2.8.5'
maven -> pom.xml
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.5</version>
</dependency>
Creating Spring Applications - Producer, Consumer
Producer and Consumer Applications written in Python
While you can write both Producer and Consumer in a single SpringBoot Application, I’m going to work on sending and receiving data between different SpringBoot applications.
Producer Application - config
I’ll create a SpringBoot application and set up basic server information in application.yml (delete the pre-configured application.properties).
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Producer Application - Controller
package com.example.producerapplication.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
@RestController
public class ProduceController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/publish")
public String publish() {
int leftLimit = 48; // numeral '0'
int rightLimit = 122; // letter 'z'
int targetStringLength = 10;
Random random = new Random();
String generatedString = random.ints(leftLimit, rightLimit + 1)
.filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
.limit(targetStringLength)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
this.kafkaTemplate.send("wool_kafka_topic", generatedString);
return "success";
}
}
- Using
REST Controller - Setting up
KafkaTemplateto send data to Kafka - Setting up the publish endpoint to send from SpringBoot to Kafka. In my case, instead of just sending, I used StringBuilder to send a random string.
Consumer Application - config
For the consumer application, I added settings for basic Kafka information plus a different server port.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: my_group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 8081
- auto-offset-reset: ‘earliest’ means the consumer reads from the earliest event
- key-deserializer and value-deserializer are responsible for deserializing the key and value sent by the Kafka producer
Consumer Application - Service
package com.example.consumerapplication.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
@KafkaListener(topics = "wool_kafka_topic")
public void receive(String message) {
System.out.println(message);
}
}
- No Controller was written. While you can create data sending functionality in the Consumer Application, this example focuses on receiving data.
- I’m creating ConsumerService in the service package and applying annotations to set up the Kafka listener.
- The information used here is parsed from
application.ymlby SpringBoot and the Kafka library.
Execution
- Start the SpringBoot Application acting as Producer (localhost:8080)
- Similarly, start the Consumer SpringBoot Application (localhost:8081)
- Call the
/publishURL endpoint from the Producer- At this point, the randomly generated string is sent to Kafka
- Check the Consumer Application console to see if the random string sent from Kafka is received
댓글남기기