Debezium
- CDC(change data capture)의 대표적인 오픈소스
- Oracle / MySQL / MongoDB / Cassandra 등 다양한 데이터베이스 지원
- 신규 버전을 계속해서 릴리즈하고 있으며 Reference 문서, 커뮤니티 등 활성화되어 있음
DB 트랜잭션 로그를 이용하여 데이터 변경 사항을 실시간으로 캡처하여 스트리밍 하는 오픈 소스
Connector | Role | Description |
Source Connector | PRODUCER | 데이터 변경 발생시 MSK로 실시간 데이터 전송 |
Sink Connector | CONSUMER | Target DB에 데이터를 적재, 대표적으로 JDBC Sink Connector |
이번 포스트에는 Debezium을 통해 Source connector로써 활용하여 Source DB의 변경사항 로그를 kafka로 보내는 파이프라인 Hands-on을 AWS Managed 제품을 이용해서 구성해보겠습니다.
1. Aurora MySQL 설치 및 설정하기
체크할 점
- 주요 파라미터 값 체크 및 파라미터그룹을 통해 설정
- 같은 VPC 대역 사용
- 테스트 데이터는 로컬에서 붙어야 하므로 bastion server를 통해 tunneling 하여 접속
Aurora MySQL로 설치하고 나서 확인해야 할 사항이 아래와 같은데, 보통은 다 아래대로 설정되어 있거나, MSK에서 Debezium connector를 MySQL Instance로 설정하는 순간 log_bin 이 OFF이던 것이 자동으로 ON으로 바뀝니다.
SHOW VARIABLES LIKE 'binlog_format'; -- ROW
SHOW VARIABLES LIKE 'binlog_row_image'; -- FULL
SHOW VARIABLES LIKE 'server_id'; -- 고유값 (필수)
SHOW VARIABLES LIKE 'log_bin'; -- ON
SHOW VARIABLES LIKE 'binlog_checksum'; -- CRC32 OR NONE
이외의 설정이 필요한 것은 파라미터그룹을 통해 설정하고 적용합니다.
*writer instance를 reboot해야 parameter group이 적용됩니다.
DDL
CREATE DATABASE cdc;
CREATE TABLE cdc_test_user (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100),
email VARCHAR(255),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
DML
INSERT INTO cdc.cdc_test_user (name, email)
VALUES ('Alice', 'alice@example.com'),
('Bob', 'bob@example.com');
UPDATE cdc.cdc_test_user
SET email = 'alice@newdomain.com'
WHERE name = 'Alice';
DELETE FROM cdc.cdc_test_user
WHERE name = 'Bob';
2. MSK 생성 및 Connect, worker 설정
1) MSK 클러스터 생성
- MSK 또한 AuroraDB와 같은 대역의 VPC에 생성을 합니다.
- 테스트 용이니 노드의 크기, 저장공간, 각종 불필요한 기능들을 최소하여 클러스터를 띄웁니다.(약 20분소요)
2) Plugin 설정
https://docs.aws.amazon.com/ko_kr/msk/latest/developerguide/msk-connect-config-provider.html
튜토리얼: 구성 공급자를 사용하여 민감한 정보 외부화 - Amazon Managed Streaming for Apache Kafka
이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.
docs.aws.amazon.com
를 기준으로 debezium connector를 설정해줍니다.
3) worker 설정
MSK Connect는 JVM 프로세스 Worker로 구성되어 있으며 Worker가 Task를 수행하게 됩니다.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
-- __amazon_msk_connect_offset DB log 오프셋 기록 토픽 파티션 및 리플리케이션 수
offset.storage.partitions=3
offset.storage.replication.factor=3
-- __amazon_msk_connect_config 커넥터 내부 구성정보 저장 토픽 리플리케이션 수(partitions은 변경불가)
config.storage.replication.factor=3
-- __amazon_msk_connect_status 작업 구성상태 변경 기록 토픽 파티션 및 리플리케이션 수
status.storage.partitions=3
status.storage.replication.factor=3
4) connector설정
생성한 plugin과 worker를 이용해서 connector를 만듭니다.
https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-adding-configuration
Debezium connector for MySQL :: Debezium Documentation
Time, date, and timestamps can be represented with different kinds of precision, including: adaptive_time_microseconds (the default) captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanos
debezium.io
- database.hostname 가 aurora의 경우 write only db endpoint를 입력
- schema.history.internal.kafka.bootstrap.servers 카프카 브로커 주소 입력
- IAM 이용해서 connector 사용해야 하지만, 테스트를 위해서 None으로 사
connector.class=io.debezium.connector.mysql.MySqlConnector
#schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
#schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.consumer.security.protocol=PLAINTEXT
schema.history.internal.producer.security.protocol=PLAINTEXT
#schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
#schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
#schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
#schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.store.only.captured.tables.ddl=true
schema.history.internal.store.only.captured.databases.ddl=true
schema.history.internal.kafka.topic=schema-changes.product
schema.history.internal.kafka.bootstrap.servers=b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092
database.history.kafka.topic=db.source.cdc-test
database.history.kafka.recovery.attempts=4
database.history.kafka.recovery.poll.interval.ms=100
database.history.kafka.query.timeout.ms=3000
database.hostname=cdc-db.cluster-ceyshgy976br.ap-northeast-2.rds.amazonaws.com
database.user=admin
database.password=비밀번호
database.connectionTimeZone=Asia/Seoul
database.port=3306
database.server.id=661753923
database.server.name=AuroraCDC
snapshot.locking.mode=none
database.include.list=cdc
table.include.list=cdc.cdc_test_user
transforms=changes,moveHeadersToValue,convertTimezone
transforms.convertTimezone.converted.timezone=Asia/Seoul
transforms.convertTimezone.type=io.debezium.transforms.TimezoneConverter
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.moveHeadersToValue.operation=move
transforms.moveHeadersToValue.type=io.debezium.transforms.HeaderToValue
transforms.moveHeadersToValue.fields=ChangedFields
transforms.changes.header.changed.name=Changed
transforms.moveHeadersToValue.headers=Changed
include.schema.changes=true
poll.interval.ms=30000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
heartbeat.interval.ms=1000
tasks.max=1
tombstones.on.delete=true
topic.prefix=AuroraCDC
5) EC2(Bastion)에 MSK Client 설치 및 확인
sudo yum update -y # 또는 apt update -y (Ubuntu인 경우)
sudo yum install -y java-1.8.0-openjdk unzip wget
# kafka 바이너리 다운로드 압축해제
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xvzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1
브로커 주소 확인 & 접속 테스트
AWS Console → MSK 클러스터 → 클러스터 정보 → 브로커 엔드포인트 (plaintext 또는 TLS) 확인
bin/kafka-topics.sh --bootstrap-server <브로커주소:포트> --list
bin/kafka-topics.sh --bootstrap-server b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092 --list
# 9092 포트는 plaintext로 msk설정에서 설정을 안하면 9094 포트 tls밖에 안된다.
bin/kafka-topics.sh --bootstrap-server <브로커주소:포트> --topic <토픽명> --describe
bin/kafka-topics.sh --bootstrap-server b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092 --topic __amazon_msk_connect_status_cdc-dbzm-1_f709d521-c797-446f-9fd7-6df86acf3896-2 --describe
MSK의 기본 properties중에, 토픽이 없는 경우 생성 하는 properties 기본 값은 아래와 같습니다.
(실제 msk 클러스터에서도 기본 properties 확인 해보면 false로 되어 있습니다.)
auto.create.topics.enable=false
https://docs.aws.amazon.com/ko_kr/msk/latest/developerguide/msk-default-configuration.html
6) CDC 로그 테스트
UPDATE cdc.cdc_test_user
SET email = 'alice@newdomain.com'
WHERE name = 'Alice';
DELETE FROM cdc.cdc_test_user
WHERE name = 'Bob';
실제 토픽의 record를 확인
bin/kafka-console-consumer.sh --bootstrap-server b-1.cdckafkacluster.cb98pv.c2.kafka.ap-northeast-2.amazonaws.com:9092 --topic AuroraCDC.cdc.cdc_test_user --from-beginning
AuroraCDC.cdc.cdc_test_user topic 생성이 되고 거기에 record가 저장
댓글