Kafka로 메시지와 이벤트 처리하기 - (2) Python으로 consumer, producer 만들기 Message and Event Processing with Kafka (2) - Python Consumer and Producer
이번 글은 카프카, 데이터 플랫폼의 최강자 라는 책을 보면서 간단히 작성 해 봤다. 간단한 실행 및 예제 프로그래밍은 책보다 공식홈페이지에 있는 문서 를 참고했다.
Apache Kafka

Kafka Topic을 발행하고 producer와 consumer 프로그램 만들기 - Python
새로운 Topic을 생성하자. blog-post 라는 Topic을 생성 해서 예제로 사용 해 보고자 한다.
bin/kafka-topics.sh --create --topic blog-post --bootstrap-server localhost:9092
개발환경 구축하기
- 가상환경 설정하기
virtualenv venv
soruce venv/bin/activate
- kafka module 설치하기
(venv) $ pip3 install kafka-python
producer.py
10000개의 데이터를 Queue에 보내는데 얼마나 걸리는지 측정 해 보기 위해 time을 사용 해 보았다.
import time
from kafka import KafkaProducer
# producer 객체 생성
# acks 0 -> 빠른 전송우선, acks 1 -> 데이터 정확성 우선
producer = KafkaProducer(acks=0, compression_type='gzip',bootstrap_servers=['localhost:9092'])
start = time.time()
for i in range(10000):
producer.send('blog-post',b'Kafka Blog Post Event Message')
producer.flush() #queue에 있는 데이터를 보냄
end = time.time() - start
print(end)
consumer.py
from kafka import KafkaConsumer, consumer
# consumer 객체 생성
consumer = KafkaConsumer(
'blog-post',
bootstrap_servers=['127.0.0.1:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms=1000
)
while True:
for message in consumer:
print(message.topic, message.partition, message.offset, message.key, message.value)
실행
- 시작하기 전, Topic을 생성했기 때문에 producer.py와 consumer.py만 실행 해 주면 된다.
- producer.py는 데이터를 발생시키는 친구이고, 10000개의 데이터를 ‘blog-post’ Topic으로 전송한다.
- consumer.py는 데이터를 읽어오는 친구이고, 무한루프를 만들었기 때문에 데이터를 받는대로 출력 해 준다.
> python producer.py
3.8476390838623047
> python consumer.py
...
blog-post 0 100 None b'Kafka Blog Post Event Message'
blog-post 0 101 None b'Kafka Blog Post Event Message'
blog-post 0 102 None b'Kafka Blog Post Event Message'
blog-post 0 103 None b'Kafka Blog Post Event Message'
...
This post was written while reading the book Kafka, The Definitive Guide. For simple execution and example programming, I referred to the official documentation rather than the book.
Apache Kafka

Publishing Kafka Topics and Creating Producer and Consumer Programs - Python
Let’s create a new Topic. We’ll create a Topic called blog-post to use as an example.
bin/kafka-topics.sh --create --topic blog-post --bootstrap-server localhost:9092
Setting Up the Development Environment
- Setting up a virtual environment
virtualenv venv
soruce venv/bin/activate
- Installing the kafka module
(venv) $ pip3 install kafka-python
producer.py
I used time to measure how long it takes to send 10,000 messages to the Queue.
import time
from kafka import KafkaProducer
# Create producer object
# acks 0 -> prioritize fast transmission, acks 1 -> prioritize data accuracy
producer = KafkaProducer(acks=0, compression_type='gzip',bootstrap_servers=['localhost:9092'])
start = time.time()
for i in range(10000):
producer.send('blog-post',b'Kafka Blog Post Event Message')
producer.flush() # Send data in the queue
end = time.time() - start
print(end)
consumer.py
from kafka import KafkaConsumer, consumer
# Create consumer object
consumer = KafkaConsumer(
'blog-post',
bootstrap_servers=['127.0.0.1:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms=1000
)
while True:
for message in consumer:
print(message.topic, message.partition, message.offset, message.key, message.value)
Execution
- Since we created the Topic before starting, we just need to run producer.py and consumer.py.
- producer.py generates data and sends 10,000 messages to the ‘blog-post’ Topic.
- consumer.py reads data, and since we created an infinite loop, it prints messages as they are received.
> python producer.py
3.8476390838623047
> python consumer.py
...
blog-post 0 100 None b'Kafka Blog Post Event Message'
blog-post 0 101 None b'Kafka Blog Post Event Message'
blog-post 0 102 None b'Kafka Blog Post Event Message'
blog-post 0 103 None b'Kafka Blog Post Event Message'
...
댓글남기기