kafka Producer Consumer 비동기처리,produce() poll(), flush()
[kafka]kafka Producer Consumer 비동기처리
kafka topic 생성부터
먼저 topic을 생성해보자...png)
./kafka-topics --bootstrap-server kafka01:9092
--create --topic pcy.simple.producer --partitions 6
- pcy.simple.producer이라는 토픽명으로 파티션은 6개를 주었음
- default replication-factor값이 3이라 복제본은 3개씩 생성됨
이를 describe해보면..
.png)
다음과 같은 토픽을 생성하고 이제 consume을 돌려보겠음
.png)
./kafka-console-consumer --bootstrap-server kafka01:9092
--topic pcy.simple.producer
--property "print.key=true" # print.key key를 출력할지 여부
--property "key.separator=|" # key 출력시 구분자
--property "print.partition=true" # 메시지의 파티션 출력 여부
--property "print.offset=true" # 메시지의 offset 출력 여부
console에서 다음과 같이 컨슘상태에 들어가면 이제 Python 스크립트를 돌려보자
Python producer code
BROKER_LST = 'kafka01:9092,kafka02:9092,kafka03:9092'
class SimpleProducer:
# 생성자 파라미터로 topic명과 얼마나 돌릴건지 duration을 받음.
def __init__(self, topic, duration=None):
self.topic = topic
self.duration = duration if duration is not None else 60
self.conf = {'bootstrap.servers': BROKER_LST}
self.producer = Producer(self.conf)# Producer 정의
def produce(self):
cnt = 0
while cnt < self.duration:
try:
# 단순하게 cli상에서 property값을 출력하는 용도
self.producer.produce(
topic=self.topic,
key=str(cnt),
value=f'hello world: {cnt}',
on_delivery=self.delivery_callback) # 비동기 처리에 있어 핵심 파라미터
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\\n' %
len(self.producer))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
self.producer.poll(0)
cnt += 1
time.sleep(1) # 1초 대기
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\\n' % len(self.producer))
self.producer.flush()
# Producer쪽 Queue에 남아있는 데이터 있을까봐 마지막에 flush처리
Producer의 동기 전송 vs비동기 전송
- producer 메시지 전송시
동기전송 방식과비동기 전송 방식을 선택할 수 있음 - 일반적으로 Producer의 성능 향상을 위해 비동기 전송 방식을 채택하는데..
- 비동기 처리는 위 코드의 producer() 함수에
on_delivery파리미터에CallBack함수를 넣어 처리가능producer.produce(topic, key="key", value="value", on_delivery=acked)
delivery_callback 함수
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(self, err, msg):
if err: # err가 전송이 잘 되었다면 null값이 들어오고 에러시 로그 받음)
sys.stderr.write('%% Message failed delivery: %s\\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\\n' %
(msg.topic(), msg.partition(), msg.offset()))
poll함수는 왜씀?
poll()함수의 용도는 Broker로 전송한 레코드에 대한 callback함수 호출 & 메시지의 정상 처리 여부를 확인하는 용도
.png)
poll()함수를 써야 하는 이유
- 예외처리 : 메시지의 정상 처리 여부 판단 후 예외 처리를 위한 용도
- QueueOverflow방지 : 비동기 방식 전송시 Broker에게 받은 Ack응답 또한 Queue에 쌓이게 되며 poll()을 호출해야 Queue에 쌓인 ack응답을 확인/처리하여 Queue를 비워주게 됨.따라서 예외처리를 위한 용도 뿐만 아니라 Overflow 방지를 위해서 주기적인 poll()함수 호출은 필요함
마지막 flush()는 무슨 용도?
- 동기 전송,비동기 전송 모두 레코드 건건이 Broker로 즉시 전송하지는 않음
- Producer내부에 Queue공간이 존재하며 Queue에 어느정도 데이터가 쌓았다가 배치 단위로 전송하게 되며, 전송할 조건이 충족되지 않으면 어느 정도 대기 후 전송하게 됨 → (linger.ms=5ms)
- 만약 전송할 데이터가 작아 순식간에 produce()로직이 완료되고 프로그램이 종료된다면 Broker로 전송하기 전 Queue에 남아있는 상태로 프로그램이 종료될 위험이 존재함
- 이를 방지하기 위해 Queue에 쌓인 레코드를 브로커로 모두 전송,ack응답까지 확인하도록 하는 flush()는 프로그램 종료 전 필히 사용해야 함
전체 코드
from confluent_kafka import Producer
import sys
import time
BROKER_LST = 'kafka01:9092,kafka02:9092,kafka03:9092'
class SimpleProducer:
# 생성자 파라미터로 topic명과 얼마나 돌릴건지 duration을 받음.
def __init__(self, topic, duration=None):
self.topic = topic
self.duration = duration if duration is not None else 60
self.conf = {'bootstrap.servers': BROKER_LST}
self.producer = Producer(self.conf)
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(self, err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\\n' %
(msg.topic(), msg.partition(), msg.offset()))
def produce(self):
cnt = 0
while cnt < self.duration:
try:
# Produce line (without newline)
self.producer.produce(
topic=self.topic,
key=str(cnt),
value=f'hello world: {cnt}',
on_delivery=self.delivery_callback)
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\\n' %
len(self.producer))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
self.producer.poll(0)
cnt += 1
time.sleep(1) # 1초 대기
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\\n' % len(self.producer))
self.producer.flush()
if __name__ == '__main__':
simple_producer = SimpleProducer(topic='pcy.simple.producer', duration=60)
simple_producer.produce()
그래서 직접 돌려봅시다.
.png)
python 스크립트 돌리면
.png)
잘 consume된다~
댓글남기기