2 분 소요


이번 글은 카프카, 데이터 플랫폼의 최강자 라는 책을 보면서 간단히 작성 해 봤다. 간단한 실행 및 예제 프로그래밍은 책보다 공식홈페이지에 있는 문서 를 참고했다.

Apache Kafka

image

Kafka Topic을 발행하고 producer와 consumer 프로그램 만들기 - Python

새로운 Topic을 생성하자. blog-post 라는 Topic을 생성 해서 예제로 사용 해 보고자 한다.

bin/kafka-topics.sh --create --topic blog-post --bootstrap-server localhost:9092

개발환경 구축하기

  • 가상환경 설정하기
virtualenv venv
soruce venv/bin/activate

  • kafka module 설치하기
(venv) $ pip3 install kafka-python

producer.py

10000개의 데이터를 Queue에 보내는데 얼마나 걸리는지 측정 해 보기 위해 time을 사용 해 보았다.

import time
from kafka import KafkaProducer

# producer 객체 생성
# acks 0 -> 빠른 전송우선, acks 1 -> 데이터 정확성 우선
producer = KafkaProducer(acks=0, compression_type='gzip',bootstrap_servers=['localhost:9092'])

start = time.time()

for i in range(10000):
 producer.send('blog-post',b'Kafka Blog Post Event Message')
 producer.flush() #queue에 있는 데이터를 보냄

end = time.time() - start
print(end)

consumer.py

from kafka import KafkaConsumer, consumer

# consumer 객체 생성
consumer = KafkaConsumer(
    'blog-post',
    bootstrap_servers=['127.0.0.1:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    consumer_timeout_ms=1000
)

while True:
    for message in consumer:
        print(message.topic, message.partition, message.offset, message.key, message.value)

실행

  • 시작하기 전, Topic을 생성했기 때문에 producer.py와 consumer.py만 실행 해 주면 된다.
  • producer.py는 데이터를 발생시키는 친구이고, 10000개의 데이터를 ‘blog-post’ Topic으로 전송한다.
  • consumer.py는 데이터를 읽어오는 친구이고, 무한루프를 만들었기 때문에 데이터를 받는대로 출력 해 준다.
> python producer.py
3.8476390838623047

> python consumer.py
...
blog-post 0 100 None b'Kafka Blog Post Event Message'
blog-post 0 101 None b'Kafka Blog Post Event Message'
blog-post 0 102 None b'Kafka Blog Post Event Message'
blog-post 0 103 None b'Kafka Blog Post Event Message'
...

This post was written while reading the book Kafka, The Definitive Guide. For simple execution and example programming, I referred to the official documentation rather than the book.

Apache Kafka

image

Publishing Kafka Topics and Creating Producer and Consumer Programs - Python

Let’s create a new Topic. We’ll create a Topic called blog-post to use as an example.

bin/kafka-topics.sh --create --topic blog-post --bootstrap-server localhost:9092

Setting Up the Development Environment

  • Setting up a virtual environment
virtualenv venv
soruce venv/bin/activate

  • Installing the kafka module
(venv) $ pip3 install kafka-python

producer.py

I used time to measure how long it takes to send 10,000 messages to the Queue.

import time
from kafka import KafkaProducer

# Create producer object
# acks 0 -> prioritize fast transmission, acks 1 -> prioritize data accuracy
producer = KafkaProducer(acks=0, compression_type='gzip',bootstrap_servers=['localhost:9092'])

start = time.time()

for i in range(10000):
 producer.send('blog-post',b'Kafka Blog Post Event Message')
 producer.flush() # Send data in the queue

end = time.time() - start
print(end)

consumer.py

from kafka import KafkaConsumer, consumer

# Create consumer object
consumer = KafkaConsumer(
    'blog-post',
    bootstrap_servers=['127.0.0.1:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    consumer_timeout_ms=1000
)

while True:
    for message in consumer:
        print(message.topic, message.partition, message.offset, message.key, message.value)

Execution

  • Since we created the Topic before starting, we just need to run producer.py and consumer.py.
  • producer.py generates data and sends 10,000 messages to the ‘blog-post’ Topic.
  • consumer.py reads data, and since we created an infinite loop, it prints messages as they are received.
> python producer.py
3.8476390838623047

> python consumer.py
...
blog-post 0 100 None b'Kafka Blog Post Event Message'
blog-post 0 101 None b'Kafka Blog Post Event Message'
blog-post 0 102 None b'Kafka Blog Post Event Message'
blog-post 0 103 None b'Kafka Blog Post Event Message'
...

댓글남기기