3 분 소요

[kafka]kafka Producer Consumer 비동기처리

kafka topic 생성부터

먼저 topic을 생성해보자..image (63)

./kafka-topics --bootstrap-server kafka01:9092 
--create --topic pcy.simple.producer --partitions 6
  • pcy.simple.producer이라는 토픽명으로 파티션은 6개를 주었음
  • default replication-factor값이 3이라 복제본은 3개씩 생성됨

이를 describe해보면..

image (64)

다음과 같은 토픽을 생성하고 이제 consume을 돌려보겠음

image (65)

./kafka-console-consumer --bootstrap-server kafka01:9092 
--topic pcy.simple.producer 
--property "print.key=true"  # print.key key를 출력할지 여부
--property "key.separator=|" # key 출력시 구분자
--property "print.partition=true" # 메시지의 파티션 출력 여부
--property "print.offset=true" # 메시지의 offset 출력 여부 

console에서 다음과 같이 컨슘상태에 들어가면 이제 Python 스크립트를 돌려보자

Python producer code

BROKER_LST = 'kafka01:9092,kafka02:9092,kafka03:9092'

class SimpleProducer:
		# 생성자 파라미터로 topic명과 얼마나 돌릴건지 duration을 받음.
    def __init__(self, topic, duration=None):
        self.topic = topic
        self.duration = duration if duration is not None else 60
        self.conf = {'bootstrap.servers': BROKER_LST}

        self.producer = Producer(self.conf)# Producer 정의
def produce(self):
        cnt = 0
        while cnt < self.duration: 
            try: 
                # 단순하게 cli상에서 property값을 출력하는 용도
                self.producer.produce(
                    topic=self.topic,
                    key=str(cnt),
                    value=f'hello world: {cnt}',
                    on_delivery=self.delivery_callback) # 비동기 처리에 있어 핵심 파라미터

            except BufferError:
                sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\\n' %
                                 len(self.producer))

            # Serve delivery callback queue.
            # NOTE: Since produce() is an asynchronous API this poll() call
            #       will most likely not serve the delivery callback for the
            #       last produce()d message.
            self.producer.poll(0)
            cnt += 1
            time.sleep(1)  # 1초 대기

        # Wait until all messages have been delivered
        sys.stderr.write('%% Waiting for %d deliveries\\n' % len(self.producer))
        self.producer.flush()
        # Producer쪽 Queue에 남아있는 데이터 있을까봐 마지막에 flush처리

Producer의 동기 전송 vs비동기 전송

  • producer 메시지 전송시 동기전송 방식비동기 전송 방식을 선택할 수 있음
  • 일반적으로 Producer의 성능 향상을 위해 비동기 전송 방식을 채택하는데..
  • 비동기 처리는 위 코드의 producer() 함수에 on_delivery 파리미터에 CallBack함수를 넣어 처리가능
    • producer.produce(topic, key="key", value="value", on_delivery=acked)
  • delivery_callback 함수
 # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(self, err, msg): 
        if err: # err가 전송이 잘 되었다면 null값이 들어오고 에러시 로그 받음)
            sys.stderr.write('%% Message failed delivery: %s\\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %d\\n' %
                             (msg.topic(), msg.partition(), msg.offset()))

poll함수는 왜씀?

poll()함수의 용도는 Broker로 전송한 레코드에 대한 callback함수 호출 & 메시지의 정상 처리 여부를 확인하는 용도

image (66)

poll()함수를 써야 하는 이유

  1. 예외처리 : 메시지의 정상 처리 여부 판단 후 예외 처리를 위한 용도
  2. QueueOverflow방지 : 비동기 방식 전송시 Broker에게 받은 Ack응답 또한 Queue에 쌓이게 되며 poll()을 호출해야 Queue에 쌓인 ack응답을 확인/처리하여 Queue를 비워주게 됨.따라서 예외처리를 위한 용도 뿐만 아니라 Overflow 방지를 위해서 주기적인 poll()함수 호출은 필요함

마지막 flush()는 무슨 용도?

  • 동기 전송,비동기 전송 모두 레코드 건건이 Broker로 즉시 전송하지는 않음
  • Producer내부에 Queue공간이 존재하며 Queue에 어느정도 데이터가 쌓았다가 배치 단위로 전송하게 되며, 전송할 조건이 충족되지 않으면 어느 정도 대기 후 전송하게 됨 → (linger.ms=5ms)
  • 만약 전송할 데이터가 작아 순식간에 produce()로직이 완료되고 프로그램이 종료된다면 Broker로 전송하기 전 Queue에 남아있는 상태로 프로그램이 종료될 위험이 존재함
    • 이를 방지하기 위해 Queue에 쌓인 레코드를 브로커로 모두 전송,ack응답까지 확인하도록 하는 flush()는 프로그램 종료 전 필히 사용해야 함

전체 코드

from confluent_kafka import Producer
import sys
import time

BROKER_LST = 'kafka01:9092,kafka02:9092,kafka03:9092'

class SimpleProducer:
		# 생성자 파라미터로 topic명과 얼마나 돌릴건지 duration을 받음.
    def __init__(self, topic, duration=None):
        self.topic = topic
        self.duration = duration if duration is not None else 60
        self.conf = {'bootstrap.servers': BROKER_LST}

        self.producer = Producer(self.conf) 

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(self, err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %d\\n' %
                             (msg.topic(), msg.partition(), msg.offset()))

    def produce(self):
        cnt = 0
        while cnt < self.duration:
            try:
                # Produce line (without newline)
                self.producer.produce(
                    topic=self.topic,
                    key=str(cnt),
                    value=f'hello world: {cnt}',
                    on_delivery=self.delivery_callback)

            except BufferError:
                sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\\n' %
                                 len(self.producer))

            # Serve delivery callback queue.
            # NOTE: Since produce() is an asynchronous API this poll() call
            #       will most likely not serve the delivery callback for the
            #       last produce()d message.
            self.producer.poll(0)
            cnt += 1
            time.sleep(1)  # 1초 대기

        # Wait until all messages have been delivered
        sys.stderr.write('%% Waiting for %d deliveries\\n' % len(self.producer))
        self.producer.flush()

if __name__ == '__main__':
    simple_producer = SimpleProducer(topic='pcy.simple.producer', duration=60)
    simple_producer.produce()
    

그래서 직접 돌려봅시다.

image (67)

python 스크립트 돌리면

image (68)

잘 consume된다~

댓글남기기