본문 바로가기
BigData/kafka

[Kafka] Aurora(MySQL) & Debezium connector & MSK Hands-on

by 스파이디웹 2025. 6. 16.
728x90

Debezium

  • CDC(change data capture)의 대표적인 오픈소스
  • Oracle / MySQL / MongoDB / Cassandra 등 다양한 데이터베이스 지원
  • 신규 버전을 계속해서 릴리즈하고 있으며 Reference 문서, 커뮤니티 등 활성화되어 있음

DB 트랜잭션 로그를 이용하여 데이터 변경 사항을 실시간으로 캡처하여 스트리밍 하는 오픈 소스

Connector Role Description
Source Connector PRODUCER 데이터 변경 발생시 MSK로 실시간 데이터 전송
Sink Connector CONSUMER Target DB에 데이터를 적재, 대표적으로 JDBC Sink Connector

 

이번 포스트에는 Debezium을 통해 Source connector로써 활용하여 Source DB의 변경사항 로그를 kafka로 보내는 파이프라인 Hands-on을 AWS Managed 제품을 이용해서 구성해보겠습니다.


1. Aurora MySQL 설치 및 설정하기

체크할 점

  • 주요 파라미터 값 체크 및 파라미터그룹을 통해 설정
  • 같은 VPC 대역 사용
  • 테스트 데이터는 로컬에서 붙어야 하므로 bastion server를 통해 tunneling 하여 접속

 

Aurora MySQL로 설치하고 나서 확인해야 할 사항이 아래와 같은데, 보통은 다 아래대로 설정되어 있거나, MSK에서 Debezium connector를 MySQL Instance로 설정하는 순간 log_bin 이 OFF이던 것이 자동으로 ON으로 바뀝니다.

SHOW VARIABLES LIKE 'binlog_format';     -- ROW
SHOW VARIABLES LIKE 'binlog_row_image';  -- FULL
SHOW VARIABLES LIKE 'server_id';         -- 고유값 (필수)
SHOW VARIABLES LIKE 'log_bin';           -- ON
SHOW VARIABLES LIKE 'binlog_checksum';  -- CRC32 OR NONE

이외의 설정이 필요한 것은 파라미터그룹을 통해 설정하고 적용합니다.

*writer instance를 reboot해야 parameter group이 적용됩니다.

DDL

CREATE DATABASE cdc;

CREATE TABLE cdc_test_user (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(100),
    email VARCHAR(255),
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

DML

INSERT INTO cdc.cdc_test_user (name, email)
VALUES ('Alice', 'alice@example.com'),
       ('Bob', 'bob@example.com');

UPDATE cdc.cdc_test_user
SET email = 'alice@newdomain.com'
WHERE name = 'Alice';

DELETE FROM cdc.cdc_test_user
WHERE name = 'Bob';

2. MSK 생성 및 Connect, worker 설정

1) MSK 클러스터 생성

  • MSK 또한 AuroraDB와 같은 대역의 VPC에 생성을 합니다.
  • 테스트 용이니 노드의 크기, 저장공간, 각종 불필요한 기능들을 최소하여 클러스터를 띄웁니다.(약 20분소요)

2) Plugin 설정

https://docs.aws.amazon.com/ko_kr/msk/latest/developerguide/msk-connect-config-provider.html

 

튜토리얼: 구성 공급자를 사용하여 민감한 정보 외부화 - Amazon Managed Streaming for Apache Kafka

이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.

docs.aws.amazon.com

를 기준으로 debezium connector를 설정해줍니다.


3) worker 설정

MSK Connect는 JVM 프로세스 Worker로 구성되어 있으며 Worker가 Task를 수행하게 됩니다.

 

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

-- __amazon_msk_connect_offset DB log 오프셋 기록 토픽 파티션 및 리플리케이션 수
offset.storage.partitions=3
offset.storage.replication.factor=3

-- __amazon_msk_connect_config 커넥터 내부 구성정보 저장 토픽 리플리케이션 수(partitions은 변경불가)
config.storage.replication.factor=3

-- __amazon_msk_connect_status 작업 구성상태 변경 기록 토픽 파티션 및 리플리케이션 수
status.storage.partitions=3
status.storage.replication.factor=3

4) connector설정

생성한 plugin과 worker를 이용해서 connector를 만듭니다.

https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-adding-configuration

 

Debezium connector for MySQL :: Debezium Documentation

Time, date, and timestamps can be represented with different kinds of precision, including: adaptive_time_microseconds (the default) captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanos

debezium.io

  • database.hostname 가 aurora의 경우 write only db endpoint를 입력
  • schema.history.internal.kafka.bootstrap.servers 카프카 브로커 주소 입력
  • IAM 이용해서 connector 사용해야 하지만, 테스트를 위해서 None으로 사
connector.class=io.debezium.connector.mysql.MySqlConnector
#schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
#schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.consumer.security.protocol=PLAINTEXT
schema.history.internal.producer.security.protocol=PLAINTEXT
#schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
#schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
#schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
#schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.store.only.captured.tables.ddl=true
schema.history.internal.store.only.captured.databases.ddl=true
schema.history.internal.kafka.topic=schema-changes.product
schema.history.internal.kafka.bootstrap.servers=b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092

database.history.kafka.topic=db.source.cdc-test
database.history.kafka.recovery.attempts=4
database.history.kafka.recovery.poll.interval.ms=100
database.history.kafka.query.timeout.ms=3000
database.hostname=cdc-db.cluster-ceyshgy976br.ap-northeast-2.rds.amazonaws.com
database.user=admin
database.password=비밀번호
database.connectionTimeZone=Asia/Seoul
database.port=3306

database.server.id=661753923
database.server.name=AuroraCDC
snapshot.locking.mode=none

database.include.list=cdc
table.include.list=cdc.cdc_test_user

transforms=changes,moveHeadersToValue,convertTimezone
transforms.convertTimezone.converted.timezone=Asia/Seoul
transforms.convertTimezone.type=io.debezium.transforms.TimezoneConverter
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.moveHeadersToValue.operation=move
transforms.moveHeadersToValue.type=io.debezium.transforms.HeaderToValue
transforms.moveHeadersToValue.fields=ChangedFields
transforms.changes.header.changed.name=Changed
transforms.moveHeadersToValue.headers=Changed

include.schema.changes=true
poll.interval.ms=30000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

heartbeat.interval.ms=1000
tasks.max=1
tombstones.on.delete=true

topic.prefix=AuroraCDC

 


5) EC2(Bastion)에 MSK Client 설치 및 확인

sudo yum update -y      # 또는 apt update -y (Ubuntu인 경우)
sudo yum install -y java-1.8.0-openjdk unzip wget

# kafka 바이너리 다운로드 압축해제
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xvzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1

 

브로커 주소 확인 & 접속 테스트

AWS Console → MSK 클러스터 → 클러스터 정보 → 브로커 엔드포인트 (plaintext 또는 TLS) 확인

bin/kafka-topics.sh --bootstrap-server <브로커주소:포트> --list
bin/kafka-topics.sh --bootstrap-server b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092 --list
# 9092 포트는 plaintext로 msk설정에서 설정을 안하면 9094 포트 tls밖에 안된다.
bin/kafka-topics.sh --bootstrap-server <브로커주소:포트> --topic <토픽명> --describe
bin/kafka-topics.sh --bootstrap-server b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092 --topic __amazon_msk_connect_status_cdc-dbzm-1_f709d521-c797-446f-9fd7-6df86acf3896-2 --describe

MSK의 기본 properties중에, 토픽이 없는 경우 생성 하는 properties 기본 값은 아래와 같습니다.
(실제 msk 클러스터에서도 기본 properties 확인 해보면 false로 되어 있습니다.)

auto.create.topics.enable=false

https://docs.aws.amazon.com/ko_kr/msk/latest/developerguide/msk-default-configuration.html

 

6) CDC 로그 테스트

UPDATE cdc.cdc_test_user
SET email = 'alice@newdomain.com'
WHERE name = 'Alice';

DELETE FROM cdc.cdc_test_user
WHERE name = 'Bob';

 

실제 토픽의 record를 확인

bin/kafka-console-consumer.sh --bootstrap-server b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092 --topic AuroraCDC.cdc.cdc_test_user --from-beginning

AuroraCDC.cdc.cdc_test_user topic 생성이 되고 거기에 record가 저장

 

 

 

728x90

댓글