BigData/kafka

[Kafka] ubuntu 20.04 lts 위의 docker-compose로 kafka + zookeeper + CMAK 구성하기

스파이디웹 2024. 3. 5. 20:32
728x90

이번 포스트에는 개발할 때 사용하기 위해 도커 컴포즈로 빠르고 간단하게 kafka broker와 CMAK() zookeeper를 컨테이너로 띄워보도록 하겠습니다.


1. EC2 생성

t2.medium 스펙(2vCore, 4GB mem)으로 ubuntu 20.04 lts 를 생성합니다.

편하게 접근하기위해 public ip도 부여받습니다.(테스트용)


2. docker 설치 + docker-compose 설치

1) docker 설치

// apt 인덱스 업데이트
$ sudo apt-get update

// repository over HTTPS를 사용하기 위한 패키지 설치
$ sudo apt-get install -y apt-transport-https ca-certificates curl gnupg-agent software-properties-common

// GPG Key를 추가하고 확인
$ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
$ sudo apt-key fingerprint 0EBFCD88

pub   rsa4096 2017-02-22 [SCEA]
      9DC8 5822 9FC7 DD38 854A  E2D8 8D81 803C 0EBF CD88
uid           [ unknown] Docker Release (CE deb) <docker@docker.com>
sub   rsa4096 2017-02-22 [S]

// 저장소 추가하고 apt 인덱스 업데이트
$ sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
$ sudo apt-get update

// Docker 설치
$ sudo apt-get install docker-ce docker-ce-cli containerd.io

// Docker 서비스 시작
$ sudo systemctl start docker
$ sudo systemctl enable docker.service
$ sudo systemctl enable containerd.service

// Docker 확인
$ sudo docker run hello-world

 

2) docker-compose 설치

sudo curl -SL https://github.com/docker/compose/releases/download/v2.24.6/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose

 

실행 권한 부여

$ sudo chmod +x /usr/local/bin/docker-compose

3. docker-compose.yml 구성

kafka 디렉토리 생성 및 docker-compose.yml 생성

$ mkdir kafka
$ cd kafka
$ nano docker-compose.yml
version: '2'

services:
  zookeeper:
    image: zookeeper:3.5.8
    hostname: zookeeper
    container_name: zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - "2181:2181"
    networks:
      - kafka_net

  kafka:
    image: bitnami/kafka:3.3.2
    hostname: kafka
    container_name: kafka
    volumes: 
      - kafka-data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      JMX_PORT: 9999
      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote=true 
                      -Dcom.sun.management.jmxremote.authenticate=false 
                      -Dcom.sun.management.jmxremote.ssl=false
                      -Dcom.sun.management.jmxremote.rmi.port=9393 
                      -Djava.net.preferIPv4Stack=true
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ip:9092,PLAINTEXT_HOST://ip:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092, PLAINTEXT_HOST://0.0.0.0:9093
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    networks:
      - kafka_net

  kafka-manager:
    image: hlebalbau/kafka-manager:latest
    container_name: kafka-manager
    volumes:
      - kafka-data:/var/lib/kafka/data
    mem_limit: 512m
    restart: always
    depends_on:
      - kafka
      - zookeeper
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: "zookeeper:2181"
    networks:
      - kafka_net

networks:
    kafka_net:
      name: kafka_net
      driver: bridge

volumes:
  kafka-data:

4. container 띄우기

$ sudo docker-compose -f docker-compose.yml up -d

컨테이너 확인

sudo docker ps


5. CMAK( Cluster Manager for Apache Kafka) UI 확인 + kafka broker(stand alone cluster) 연동

cmak ui 접근을 위해 제 local pc의 ip:port를 지정해줘야 하지만 편의상 모든 ip port open

 

public ip:9000으로 접속하면 아래와 같이 CMAK UI가 보임

Add Cluster를 통해 broker 등록

 

Topic 생성


6. kafka produce, consume test

1) KafkaMain.py 작성

import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time


def produce(topicName, brokerLists):
        producer = KafkaProducer(bootstrap_servers=brokerLists,
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))
        for i in range(100):
                producer.send(topicName, i)

def consume(topicName, brokerLists):
        consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                        group_id="test")

        for msg in consumer:
               print(msg)


action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')

if action == 'produce':
    produce(topicName, brokerLists)
elif action == 'consume':
    consume(topicName, brokerLists)
else:
    print('wrong arguments')

 

2) 필요 library 설치

$ sudo apt install python3-pip
$ pip3 install kafka-python

 

3) produce

$ python3 KafkaMain.py produce test 3.38.247.190:9092

offsets가 100으로 정확히 100개 들어 간 것을 확인 할 수 있습니다.

4) consume

$ python3 KafkaMain.py consume test 3.38.247.190:9092

정확히 100개를 consume 하여 lag값이 0인 것을 확인 할 수 있습니다.

 

 

참조:

https://gist.github.com/dkurzaj/2a899de8cb5ae698919f0a9bbf7685f0

https://blog.soga.ng/story/31/

https://devocean.sk.com/blog/techBoardDetail.do?ID=164007

https://soobysu.tistory.com/99

https://gist.github.com/dkurzaj/2a899de8cb5ae698919f0a9bbf7685f0

728x90