6 분 소요


빅 데이터의 세계에서 안정적인 스트리밍 플랫폼은 필수이다. 현재 가장 주목받고있는 kafka 스트리핑 플랫폼과 SpringBoot를 활용해서 간단한 Event 메시지를 주고받는 작업을 해보자

In the world of big data, a reliable streaming platform is essential. Let’s work on sending and receiving simple event messages using the currently most popular Kafka streaming platform with SpringBoot.

Kafka 설치하기

당연한 얘기지만 Kafka를 사용하기 위해서는 Kakfa가 설치되어있는 서버를 사용하거나 직접 설치해서 사용해야 한다

나는 Docker를 사용해서 Kafka를 로컬 개발환경에 설치 한 후, 사용하려고 한다.

다른곳에 설치되어있는 Kafka를 사용한다면 오늘 사용하는 Kafka 주소만 변경 해 주면 될 것 같다

간단하게 개념정리

Kafka Broker

  • 단일 Kafka 클러스터는 브로커로 구성
  • 생산자와 소비자를 처리하고 클러스터에 복제된 데이터를 유지하는 역할

Kafka Topic

  • 레코드가 게시되는 범주
  • 카프카 메시지의 주제

Kafka Producer

  • Kafka에 데이터를 가져오기 위해 작성하는 애플리케이션
  • 데이터 생산자

Kafka Consumer

  • Kafka에서 데이터를 가져오기 위해 작성하는 프로그램
  • 데이터 소비자

Zookeeper

  • Kafka 클러스터를 관리하고, 노드 상태를 추적하고, 주제 및 메시지 목록을 유지 관리하는 데 사용

Installing Kafka

Obviously, to use Kafka, you either need to use a server where Kafka is already installed or install it yourself.

I’m going to install Kafka in my local development environment using Docker.

If you’re using Kafka installed elsewhere, you just need to change the Kafka address used today.

Quick Concept Overview

Kafka Broker

  • A single Kafka cluster consists of brokers
  • Responsible for handling producers and consumers and maintaining replicated data in the cluster

Kafka Topic

  • A category where records are published
  • The subject of Kafka messages

Kafka Producer

  • An application written to send data to Kafka
  • Data producer

Kafka Consumer

  • A program written to retrieve data from Kafka
  • Data consumer

Zookeeper

  • Used to manage the Kafka cluster, track node status, and maintain a list of topics and messages

Dockerfile

zookeeper와 kafka를 설치하려고한다

따로 Docker파일을 작성하지 않고 docker-compose.yml로 작성하려고 한다

Dockerfile

I’m going to install zookeeper and kafka.

Instead of writing a separate Dockerfile, I’ll use docker-compose.yml.

#docker-compose.yml - kafka/zookeper
version: '3'

services:
    zookeeper:
    image: arm64v8/zookeeper
    container_name: zookeeper
    ports:
        - "2181:2181"
    kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
        - "9092:9092"
    environment:
        KAFKA_ADVERTISED_HOST_NAME: localhost
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

zookeeper 에서는 2181 포트를 사용 할 것이고, kafka에서는 9092 포트를 사용하기 때문에 현재 포트가 사용중인지 한번 확인 해야 한다

docker ps 명령어를 통해서 해당 프로세스들이 잘 실행되고 있는지 확인한다

Since zookeeper will use port 2181 and kafka will use port 9092, you should check if these ports are currently in use.

Use the docker ps command to verify that these processes are running properly.

Kafka 다루기 - 연결하기, topic 생성하기

Kafka 연결하기

Zookeeper 및 Kafka 컨테이너가 실행되면, 카프카에 접속 해 준다

Working with Kafka - Connecting, Creating Topics

Connecting to Kafka

Once the Zookeeper and Kafka containers are running, connect to Kafka.

docker exec -it kafka /bin/sh

Kafka Topic 만들기

Kafka 컨테이너 내에 카프카 스크립트들은 opt 폴더 내의 kafka_<버전> 폴더 내의 bin 아래에 있다.

나의 경우, opt/kafka_2.13-2.81/bin 에서 작업을 했다 (설치 시기나 이미지에 따라서 카프카버전이 다를 것) /opt/kafka_2.13-2.81/bin경로로 들어가서 아래와 같은 명령어로 토픽을 생성한다

Creating a Kafka Topic

The Kafka scripts inside the Kafka container are located in the bin folder under the kafka_ folder in opt.

In my case, I worked in opt/kafka_2.13-2.81/bin (the Kafka version may differ depending on when you installed it or the image). Navigate to /opt/kafka_2.13-2.81/bin and create a topic with the following command.

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic wool_kafka_topic

아래의 명령어로 토픽 생성을 확인 할 수 있다

You can verify the topic creation with the following command.

kafka-topics.sh --list --zookeeper zookeeper:2181

이제 Docker를 사용해서 간단한 kafka를 세팅은 완료했으니까, Spring과 이어질 작업을 해보자

간단한 토픽 관련된 명령어 및 python으로 핸들링 하는 방법은 미리 포스팅 해 본 적이 있다.

참조 포스팅

Now that we’ve completed the simple Kafka setup using Docker, let’s work on connecting it with Spring.

I’ve previously posted about simple topic-related commands and how to handle them with Python.

Reference Post

Spring Dependency 설치하기

SpringBoot를 gradle이나 maven으로 시작했는지 잘 기억하고, 각각 환경에 맞는 방법으로 설치 해 주면 될 것 같다.

혹은, Intellij에서 SpringBoot를 시작할때 Spring Starter에서 Web과 Kafka를 선택해서 설치 해 주어도 된다.

Gradle -> build.gradle

Installing Spring Dependencies

Remember whether you started SpringBoot with Gradle or Maven, and install it using the appropriate method for your environment.

Alternatively, when starting SpringBoot in IntelliJ, you can select Web and Kafka in Spring Starter to install them.

Gradle -> build.gradle

// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka:2.8.5'

maven -> pom.xml

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.5</version>
</dependency>

Spring Application 생성하기 - Producer, Consumer

파이썬으로 작성한 Producer,Consumer Application

SpringBoot Application 하나에 Producer, Consumer 모두 작성 할 수 있지만 서로 다른 스프링부트 어플리케이션에서 데이터를 주고받는 작업을 진행 해 보려고한다.

Producer Application - config

스프링부트 어플리케이션을 생성하고, applicaion.yml을 만들어 서버 기본정보를 세팅하려고한다 (기존에 미리 세팅되어있는 application.properties는 삭제 해준다)

Creating Spring Applications - Producer, Consumer

Producer and Consumer Applications written in Python

While you can write both Producer and Consumer in a single SpringBoot Application, I’m going to work on sending and receiving data between different SpringBoot applications.

Producer Application - config

I’ll create a SpringBoot application and set up basic server information in application.yml (delete the pre-configured application.properties).

spring:
    kafka:
    producer:
        bootstrap-servers: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer Application - Controller

package com.example.producerapplication.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
public class ProduceController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/publish")
    public String publish() {
        int leftLimit = 48; // numeral '0'
        int rightLimit = 122; // letter 'z'
        int targetStringLength = 10;
        Random random = new Random();

        String generatedString = random.ints(leftLimit, rightLimit + 1)
                .filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
                .limit(targetStringLength)
                .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
                .toString();

        this.kafkaTemplate.send("wool_kafka_topic", generatedString);

        return "success";
    }
}
  • REST Controller를 사용
  • KafkaTemplate를 사용해서 카프카에 데이터를 전송 할 수 있도록 세팅한다
  • publish 주소에서 SpringBoot -> Kafka 로 전송 할 수 있도록 세팅한다. 나의 경우에는 그냥 전송하기보다 임의의 Random String을 전송하도록 StringBuilder로 작성했다

Consumer Application - config

컨슈머 어플리케이션의 정보는 기본 카프카의 정보와 더불어 서버 포트를 달리 해 주는 설정까지 추가했다

  • Using REST Controller
  • Setting up KafkaTemplate to send data to Kafka
  • Setting up the publish endpoint to send from SpringBoot to Kafka. In my case, instead of just sending, I used StringBuilder to send a random string.

Consumer Application - config

For the consumer application, I added settings for basic Kafka information plus a different server port.

spring:
    kafka:
    consumer:
        bootstrap-servers: localhost:9092
        group-id: my_group
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
    port: 8081
  • auto-offset-reset: 가장 이른 것은 소비자가 가장 이른 이벤트부터 읽는다는 것을 의미
  • key-deserializer 및 value-deserializer는 메시지를 보내기 위해 Kafka 생산자가 보낸 키와 값을 역직렬화하는 역할
  • auto-offset-reset: ‘earliest’ means the consumer reads from the earliest event
  • key-deserializer and value-deserializer are responsible for deserializing the key and value sent by the Kafka producer

Consumer Application - Service

package com.example.consumerapplication.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class ConsumerService {

    @KafkaListener(topics = "wool_kafka_topic")
    public void receive(String message) {

        System.out.println(message);
    }

}
  • Controller를 작성하지 않았다. Consumer Application에서 데이터를 전송하는 기능을 만들 수 있지만 현재 예제에서는 데이터를 받는것에 집중하려고한다
  • Service패키지에 ConsumerSerivce를 만들고 어노테이션을 적용해서 카프카 리스너를 세팅하려고 한다.
  • 여기서 사용되는 정보들은 application.yml에 있는 정보들을 스프링부트와 kafka 라이브러리가 파싱해서 가져간다
  • No Controller was written. While you can create data sending functionality in the Consumer Application, this example focuses on receiving data.
  • I’m creating ConsumerService in the service package and applying annotations to set up the Kafka listener.
  • The information used here is parsed from application.yml by SpringBoot and the Kafka library.

실행

  • Producer 역할을 하는 SpringBoot Application을 시작한다 (localhost:8080)
  • 마찬가지로 Consumer SpringBoot Application을 시작한다 (localhost:8081)
  • Producer에서 /publish의 url 주소를 호출 해 준다
    • 이 때 , 우리가 Random으로 생성한 문자열을 kafka로 보내게 된다
  • Consumer Application의 콘솔에서 kafka에서 보낸 랜덤스트링이 있는지 확인한다

Execution

  • Start the SpringBoot Application acting as Producer (localhost:8080)
  • Similarly, start the Consumer SpringBoot Application (localhost:8081)
  • Call the /publish URL endpoint from the Producer
    • At this point, the randomly generated string is sent to Kafka
  • Check the Consumer Application console to see if the random string sent from Kafka is received

댓글남기기