이번 포스트에는 kafka와 Debezium 그리고 CDC구현에 대해서 정리해보겠습니다.
1. Debezium이란?
- database에서 발생하는 변경사항을 추적할 수 있는 일종의 Apache Kafka Connect의 source connector
- 성공적으로 commit이 발생한 데이터에 대해서만 변경사항이 전파되기 때문에 실패한 트랜잭션은 고려할 필요가 없음
- Debezium은 변경사항을 디스크에 저장하기 때문에 데이터의 변경사항을 전달받아야 하는 애플리케이션이 다운되더라도 문제가 없음
- Debezium은 애플리케이션이 여러 DBMS와 호환이 될 수 있도록 middleware 역할을 함
- Kafka와 Kafka Connect를 활용함으로써 durability, reliability, fault tolerance을 보장
2. CDC(Change Data Capture)가 필요한 이유는?
CDC는 단어의 뜻과 같이, 말 그대로 변경데이터를 감지하기 위한 소프트웨어 디자인 패턴입니다.
Source DB로부터 데이터를 ETL 혹은 ELT해올 때, daily하게 배치 단위로 가져온다고 생각해 봤을 때,
Source DB와 분석데이터(Target)의 정합성이 지켜져야 데이터를 활용할 때 의미가 있다고 할 수 있겠습니다.
변경이 없고 삭제가 없는 append만 되는 형태의 테이블이면 문제가 되지 않습니다. 하지만 누군가 임의로 row를 삭제한다거나, update가 자주일어나는 테이블이라던지, flag컬럼을 두어 delte_yn처럼 삭제구분을 두는 것이 아닌 실제로 row자체가 삭제가 되는 테이블의 경우에는 분석데이터에서 해당 변경사항들을 반영해야 됩니다.
실제로 ETL을 해오는 방법은 크게 3가지가 있습니다.
- M: 모든 데이터에 대해서 전체 적재(전체 데이터 Overwrite를 통해 변경사항이 전부 반영된 최신의 데이터를 가져옵니다.)
- HI: Insert만 되는 테이블의 성격에 해당하며, 날짜를 기준으로 증분 데이터만 가져옵니다.
- HIU: Insert 혹은 Update되는 테이블성격에 해당하며 날짜를 기준으로 최신의 데이터(증분 데이터)를 가져옵니다.
이 3가지 모두 쿼리로 적용가능한 ETL 방법이지만, 삭제된 데이터를 반영할 수 있는 것은 M밖에 없습니다. 하지만 데이터가 수억 건 이상으로 넘어가게 된다면, index scan도 의미 없는 full scan 특성 상 운영DB에 엄청난 부하가 가게 될 것이고, 시간과 비용 또한 무시 못하게 됩니다. 결국 삭제를 감지하여 반영할 수 있는 다른 방법이 필요한데 이 때 CDC를 구현하게 됩니다.
3. Debezium의 특징과 기능
1) 특징
- Debezium은 데이터베이스의 로그를 읽기 때문에 모든 데이터의 변경사항을 추적할 수 있음
- Debezium이 다운되더라도 데이터베이스의 로그가 사라지지 않기 때문에 안전
- 데이터베이스의 변경 로그(binlog, logical replication stream 등) 를 직접 읽어서, 데이터 변경 사항을 비동기적으로 캡처
- Log based CDC를 통해 CPU 사용량의 증가 없이 데이터의 변경사항을 준 실시간으로 추적
- polling 방식(SELECT 쿼리 주기적 실행)이 아님
- 스키마의 변경도 감지할 수 있는데, MySQL의 경우 kafka로 보내는 message에 반영이 되어서 보내지고, PostgreSQL의 경우 주기적으로 PostgreSQL system catalog를 polling 하여 DDL 감지를 보완
- Debezium MySQL 커텍터를 구성하여 데이터베이스의 테이블에 적용된 스키마 변경 이벤트를 생성할 수 있음(Schema change topic)
2) 기능
- Snapshot: Debezium이 source database와 처음으로 연결됐거나 또는 모든 로그가 존재하지 않을 경우 활용하는 방식
- Filters: schema, table, column 별로 필터링 여부를 결정
- Masking: 특정 컬럼의 데이터를 masking 할 수 있음, 민감한 데이터(예를 들어 개인정보)를 취급할 때 유용한 기능
- Monitoring: JMX를 통해 connector을 모니터링
- Message transformation: 메시지를 수정할 수 있는 기능
3) Snapshot Mode와 부하 가능성
3-1. Snapshot 이란?
Debezium이 처음 시작될 때, 대상 테이블의 초기 상태(정적 데이터)를 한 번 가져오는 작업을 의미
- DB 테이블 전체를 스캔하여 SELECT로 가져옴(단발성 full scan snapshot)
- 이후에는 binlog (또는 logical replication stream 등)만 지속적으로 tailing
- "전체 테이블의 초기 상태" + "이후 변경사항(CDC)"을 함께 추적해야 정합성 있는 데이터 스트림을 Kafka에 보낼 수 있으므로, 스냅샷을 가져온 이후에 이후의 변경사항들에 대해서 변경 로그로 확인
3-2. Snapshot Mode
모드 이름 | 설명 |
initial (기본값) | 처음 시작 시 전체 테이블을 스냅샷, 이후 binlog로 계속 tailing |
schema_only | 테이블 구조만 가져오고 데이터는 snapshot 안 함 (단, binlog는 계속 읽음) |
initial_only | 최초 snapshot만 수행하고 binlog는 tailing 안 함 (일회성 추출 목적) |
never | snapshot도, binlog도 안 읽음. (기존 offset 기준부터만 tailing) |
when_needed | Debezium이 판단해서 snapshot 수행 여부 결정 (특정 커넥터 지원) |
3-3. 운영에서의 부하 가능성
- DB에 초기 설정시에 snapshot을 찍는(fullscan)을 해야 할 일이 생기고, 이는 운영DB에 부하를 줄 수 있습니다.
- 대량 row SELECT → DB의 디스크 I/O, CPU 자원 사용
- 병렬성 없이 단일 커넥터에서 수행되면 느림
- 다만 대량의 처리가 아닌 이후의 변경 로그만 읽어오는 경우라면 CPU의 부하가 크지 않음
- 부하 관련 요소
항목 | 영향 | 설명 |
Snapshot 테이블 크기 | CPU, Disk I/O | 큰 테이블 SELECT는 I/O와 CPU 모두 부담 |
binlog 크기 (offset 이후) | CPU, Memory | 오래된 binlog를 순차 처리하므로 CPU 부하 증가 가능 |
CDC consumer 처리 속도 | Kafka backlog | Debezium이 빨리 밀어도 Kafka나 downstream이 느리면 부하 쌓임 |
Debezium 인스턴스 리소스 | Memory, GC | JVM 메모리 부족 시 GC 폭증 및 OOM 발생 가능 |
- debezium 재시작 시 흐름
- 커넥터 시작
- Kafka offset 저장소 확인
- 이전에 snapshot이 완료되었고, binlog 위치도 있다면 → snapshot 건너뜀
- 저장된 binlog 위치부터 log-based CDC 수행
- snapshot이 다시 실행되는 경우
상황 | 설명 |
offset.storage.topic 데이터가 사라졌거나 초기화됨 | Debezium은 상태를 모름 → snapshot 다시 수행 |
Kafka Connect의 connector name이 바뀜 | offset이 새로운 이름으로 저장되므로, 새로운 snapshot 수행 |
snapshot.mode=always 로 명시 | 매번 snapshot 수행하도록 설정 |
커넥터 구성 변경 (database.hostname, database.server.name 등) | 내부적으로 새로운 connector로 인식될 수 있음 |
pod가 ephemeral하고, offset을 Kafka가 아닌 로컬에 저장했을 경우 | 재시작 시 로컬 offset 정보 유실 → snapshot 재시작 가능성 높음 |
- 안전한 운영을 위한 고려 요소
항목 권장 | 설정 / 전략 |
Offset 저장 방식 | offset.storage.topic을 Kafka topic으로 설정 |
Connector name | 동일한 이름 유지 (connector 이름 변경 시 새로운 snapshot) |
Snapshot 모드 | 재시작 시 snapshot 방지하려면 snapshot.mode=schema_only 또는 never 고려 |
Offset 백업 | 필요 시 offset topic 백업 (Kafka mirror 등 활용) |
Connector 배포 시 주의 | Helm/Manifest로 connector 생성 시 매번 생성-삭제하지 않도록 주의 |
4. Debezium을 통해 CDC를 구현하는 방법(Debezium의 출력값과 활용)
1) Snapshot
kafka topic에 message는 JSON형태로 저장이 되며 아래와 같은 형태로 저장 됩니다.
- snapshot으로 읽은 row들은 Kafka topic에 CDC 이벤트처럼 produce
- 실제로는 binlog 이벤트와 거의 동일한 구조 (op: r, 즉 read operation)
- "snapshot에서 온 데이터인지"를 source.snapshot=true로 표시해서 구분
{
"op": "r",
"before": null,
"after": { "id": 1, "name": "Alice" },
"source": {
"snapshot": true,
...
}
}
항목 | Snapshot | Binlog tailing |
목적 | 기존 데이터 동기화 | 변경 데이터 추적 |
방식 | SELECT * FROM ... | binlog / WAL 등 stream read |
처리 시점 | 커넥터 최초 기동 시 | snapshot 이후 지속적으로 |
Kafka 전송 형식 | op: r | op: c / u / d (create/update/delete) |
저장 위치 | Kafka topic | Kafka topic |
2) Binlog tailing
{
"before": null,
"after": {
"id": 101,
"name": "Alice",
"email": "alice@example.com"
},
"source": {
"db": "mydb",
"table": "users",
"ts_ms": 1723452340000
},
"op": "c",
"ts_ms": 1723452340156
}
- before: 이전 값 (insert이므로 null)
- after: 새 레코드 값
- op: 'c' = create (insert)
- ts_ms: 발생 시간 (timestamp)
{
"before": {
"id": 101,
"name": "Alice",
"email": "alice@example.com"
},
"after": {
"id": 101,
"name": "Alice",
"email": "alice@newdomain.com"
},
"source": {
"db": "mydb",
"table": "users",
"ts_ms": 1723452399999
},
"op": "u",
"ts_ms": 1723452400100
}
- before: 수정 전 값
- after: 수정 후
- op: 'u' = update
- ts_ms: 발생 시간 (timestamp)
{
"before": {
"id": 101,
"name": "Alice",
"email": "alice@newdomain.com"
},
"after": null,
"source": {
"db": "mydb",
"table": "users",
"ts_ms": 1723452499000
},
"op": "d",
"ts_ms": 1723452500000
}
- before: 삭제 전 값
- after: 삭제 되었으므로 null
- op: 'd' = delete
- ts_ms: 발생 시간 (timestamp)
3) 비교표
항목 | Snapshot | Binlog tailing |
목적 | 기존 데이터 동기화 | 변경 데이터 추적 |
방식 | SELECT * FROM ... | binlog / WAL 등 stream read |
처리 시점 | 커넥터 최초 기동 시 | snapshot 이후 지속적으로 |
Kafka 전송 형식 | op: r | op: c / u / d (create/update/delete) |
저장 위치 | Kafka topic | Kafka topic |
4) CDC 구현
아래와 같은 방법으로 CDC를 구현할 수 있겠습니다.
- Spark로 사용하여 JSON을 파싱하고 op를 기준으로 구분하여 로직에 반영하여 CDC를 구현
- flink는 Debezium Kafka source connector를 공식 지원하고, format = 'debezium-json'을 통해 op에 따른 upsert/delete를 자동 처리
예제 코드
Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, StructField
# Kafka에서 메시지 읽기
spark = SparkSession.builder \
.appName("DebeziumCDC") \
.getOrCreate()
# JSON 구조 정의 (필요에 따라 더 자세히)
value_schema = StructType([
StructField("before", StructType([
StructField("id", StringType()),
StructField("name", StringType()),
StructField("email", StringType())
])),
StructField("after", StructType([
StructField("id", StringType()),
StructField("name", StringType()),
StructField("email", StringType())
])),
StructField("op", StringType()),
StructField("ts_ms", StringType())
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "mydb.users") \
.option("startingOffsets", "latest") \
.load()
# Kafka value를 JSON으로 파싱
parsed_df = df.selectExpr("CAST(value AS STRING)") \
.withColumn("data", from_json(col("value"), value_schema)) \
.select("data.*")
# INSERT, UPDATE, DELETE 처리 분기
insert_df = parsed_df.filter(col("op") == "c").select("after.*")
update_df = parsed_df.filter(col("op") == "u").select("after.*")
delete_df = parsed_df.filter(col("op") == "d").select("before.*")
# 예: 콘솔에 출력하거나, HDFS/DB로 저장
insert_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
spark.streams.awaitAnyTermination()
flink
CREATE TABLE users_cdc (
id STRING,
name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'mydb.users',
'properties.bootstrap.servers' = 'broker:9092',
'format' = 'debezium-json'
);
CREATE TABLE users_sink (
id STRING,
name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://sink-db:3306/sink',
'table-name' = 'users_sink',
'username' = 'flink',
'password' = 'flinkpw'
);
-- 실시간 CDC insert/update/delete 반영
INSERT INTO users_sink
SELECT * FROM users_cdc;
5. Kafka와 Debezium 연동
기본적으로, 하나의 데이터베이스 테이블에서 발생한 변경 사항은 하나의 kafka 토픽에 기록
1) Kafka Debezium 파이프라인 구조
이외에 지원 가능한 소스 종류
지원 소스 | 비고 |
MySQL | 매우 안정적 |
PostgreSQL | 매우 안정적 |
MongoDB | NoSQL도 지원함 |
SQL Server | CDC 기능 필요 |
Oracle | 기업용 환경에서 사용 가능 |
Db2 | 지원 시작 |
Vitess | 대규모 분산 DB |
Cassandra | 커뮤니티 기반 시도 중 |
Spanner | 일부 실험적 |
Outbox Pattern | RDB 외부 시스템 연동 간접 방법 |
6. flink와 LakeHouse format 관계
이후 포스트에도 flink와 lakehouse format에 대해서 자세히 다뤄보겠지만, 이 3가지는 매우 밀접한 관계가 있고 데이터 스트리밍 파이프라인에서 서로 유기적으로 연결된 구성 요소입니다.
1) CDC 흐름
- source DB로부터 Debezium 혹은 Flink cdc connector를 통해서 Kafka에 Message로써 저장한다.
- Kafka에 저장된 변경 log를 spark streaming 혹은 flink를 이용하여 data lake에 parquet 파일로 생성한다.
- Iceberg와 같은 Lakehouse format으로 실시간으로 변경되는 데이터를 새로운 version으로 파일을 생성하여 테이블에 적용한다.
즉, 원본 데이터로부터 변경되는 데이터를 CDC로 감지 해야만 뒷단의 flink로 반영한 CDC 데이터를 Lakehouse format table에 적용하는 것이 의미있다고 볼 수 있겠습니다.
물론 rdb와 rdb의 싱크를 맞추기 위해서 debezium - kafka를 이용할 수도 있습니다. 그렇게 되면 뒤의 sink connector가 jdbc connector가 될 수도 있겠습니다.
2) 각 단계를 대처하는 tool
단계 | 주요 역할 | 대표 도구 | 대체 / 보완 도구 |
Source (운영 DB 등) | 데이터 변경이 발생하는 원본 시스템 | MySQL, PostgreSQL, SQL Server 등 | Oracle, MongoDB, Cassandra 등 |
CDC 추출 도구 | DB 변경 로그 추출 | Debezium (Kafka Connect 기반) | Flink CDC Connector |
Streaming Framework | 실시간 데이터 처리 및 변환 | Apache Flink | Apache Spark Structured Streaming, Kafka Streams |
중간 전송 (Broker) | 데이터 전달 및 버퍼링 | Apache Kafka | Apache Pulsar, Redpanda, RabbitMQ, AWS Kinesis 등 |
Lakehouse Format 저장 | S3/HDFS 등에 실시간 적재 및 CDC 통합 저장 | Apache Iceberg, Delta Lake, Apache Hudi | 포맷에 따라 선택 가능 (Athena 호환 필요 시 Iceberg/Delta 추천) |
댓글