본문 바로가기
BigData/Flink

[Flink] Flink가 CDC에 강력한 이유, SparkSteaming과 비교, 코드 예제(feat. Debezium, Iceberg)

by 스파이디웹 2025. 6. 22.
728x90

1. Flink가 CDC에 있어서 강력한 이유

1) 진정한 스트림 처리 엔진 (Event-at-a-time, Low Latency)

  • Flink는 레코드 단위(event 단위)로 즉시 처리하는 ‘진짜 스트리밍’ 처리 구조
  • 반면 Spark Structured Streaming은 마이크로배치(몇 백 ms~초 단위) 처리로, 처리 지연(latency)이 크고 실시간 반응 속도가 떨어짐
  • Kafka Connect + Debezium처럼 CDC 이벤트를 실시간으로 받아 즉각 반영해야 하는 경우 Flink가 훨씬 적합

2) 강력한 상태 관리(Stateful Processing)

  • Flink는 상태 저장소(State Backend)를 이용해 이벤트 간 상태를 유지할 수 있음
  • CDC 데이터는 여러 이벤트가 특정 키(예: PK)에 대해 여러 상태 변화(Insert, Update, Delete)를 포함하는데, Flink는 이를 효과적으로 관리하며 정확한 현재 상태를 계산할 수 있음
  • 다른 프레임워크는 상태 관리를 자체 구현하거나 제한적인 상태 저장만 지원

3) Exactly-once 처리 보장 (Event Processing Guarantee)

  • Flink는 체계적인 Checkpointing, Savepoint, Two-phase commit 프로토콜을 지원해, Source부터 Sink까지 End-to-End exactly-once 처리를 강력하게 보장
  • CDC 처리에 있어 중복 데이터 처리나 누락 없이 정확한 데이터 반영이 필수적인데, Flink가 이를 네이티브로 제공
  • Kafka Streams나 Spark Structured Streaming은 조건부로 exactly-once를 보장하지만, Flink만큼 완전하지 않고 복잡한 설정이 필요

4) 내장된 CDC 커넥터 및 통합 (Debezium 연동 최적화)

제일 중요한 부분이라 생각되는 특징입니다.

  • Flink CDC 커넥터는 Debezium을 내부적으로 포함하거나 연동해 여러 DB(MySQL, PostgreSQL, MongoDB 등)의 CDC 이벤트를 네이티브하게 처리
  • CDC 이벤트의 before/after 데이터와 op 타입(Insert, Update, Delete)을 자동 해석하여, 사용자가 별도 파싱 없이 자동으로 Upsert, Delete 연산을 수행하도록 지원
  • 다른 프레임워크는 보통 Kafka Connect + 별도 소비 로직 구현이 필요

5) 강력한 Event Time & Watermark 지원

  • Flink는 복잡한 event-time 처리, 지연 이벤트 허용, watermark 전략을 유연하게 제공하여, CDC 이벤트가 순서대로 도착하지 않거나, 네트워크 지연이 있어도 정확한 상태 반영이 가능
  • Spark는 watermark 지원이 있지만 Flink만큼 다양하고 유연하지 않음
  • Kafka Streams는 제한적이며, CDC에서 event time 기반 정합성 유지가 어려울 수 있음

6) 유연한 상태 집계 및 복잡한 처리 지원

  • Flink는 CEP(Complex Event Processing), Windowing, 조인, 패턴 매칭 등 고급 스트림 연산을 실시간으로 처리 가능
  • CDC 데이터 처리 시 여러 테이블 간 동기화, 데이터 정합성 체크, 복잡한 비즈니스 로직 적용에 강력
  • Spark는 주로 batch/마이크로배치에 적합하며, CEP나 실시간 복잡한 상태 관리는 어려움

7) 대규모 확장성 & 고가용성

  • Flink는 분산 처리 및 장애 복구를 강력히 지원하며, 수백 ~ 수천 노드로 확장 가능
  • CDC 데이터는 매우 큰 규모일 수 있기 때문에, Flink의 확장성은 큰 장점
  • Kafka Streams는 JVM 내 단일 노드 기반이며, Spark는 클러스터 단위지만 마이크로배치 특성으로 인해 확장 지연이 있을 수 있음

2. Flink와 Spark Streaming 비교

Flink를 대체할 수 잇는 프레임워크 중 하나가 Spark Streaming인데, 이 2개를 비교하고 언제 SparkStreaming을 사용할 수 있을지 확인해보겠습니다.

1) 처리 제어 수준

Spark Structured Streaming은 다음 조건이 충족되면 exactly-once semantics를 지원

조건 설명
Source가 재처리 지원 (예: Kafka + offset tracking) Kafka source는 offset 기반으로 상태 관리
Sink가 idempotent or upsert 가능 Iceberg, Delta Lake, Hudi 등
checkpointLocation 설정 상태 저장 및 recovery 지원
.writeStream \
  .option("checkpointLocation", "/chk/path") \
  .start()

 

Flink는 checkpoint + 상태관리로 exactly once를 지원

  • Flink는 일정 간격으로 전체 파이프라인의 상태를 체크포인팅
  • Sink 연산(예: Kafka, DB 등)도 2-phase commit 또는 idempotent 방식으로 구성
  • 실패 시 체크포인트 지점부터 복구하여, 중복 없이 재처리

2) WaterMark, Checkpoint 지원 여부

Spark Structured Streaming

  • withWatermark("event_time", "10 minutes") 형태로 event-time 처리 가능
  • checkpointLocation 설정 시 상태와 offset 저장
 
df.withWatermark("event_time", "5 minutes")
  • watermark는 aggregation, window join 등에서 주로 사용
  • 늦은 이벤트 처리 가능

Flink

  • 더 세밀한 watermark 설정 가능 (boundedOutOfOrderness, punctuated 등)
  • Checkpoint도 exactly-once recovery, stateful operator 수준까지 보존

3) Iceberg Sink 지원 여부

Spark Streaming은 3.1+ 버전의 경우 Iceberg를 지원하고 Flink 또한 Iceberg으로 write하는 것을 지원합니다.

cdc_df.writeStream \
  .format("iceberg") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/chk/") \
  .start("catalog.db.target_table")

 


4) 마이크로배치 vs 실시간 스트리밍

항목 Spark Structured Streaming Flink
스트리밍 방식 마이크로배치 (기본 500ms~수초 단위) True 스트리밍 (레코드 단위 처리)
지연 시간 수초 이상 수 밀리초 수준
처리 모델 정기적으로 Trigger (Batch-like) 이벤트가 도착하는 즉시 처리
Upsert/Merge 직접 MERGE 구현 필요 CDC-aware sink (Hudi/Iceberg)에서 자동 지원
State 저장 checkpoint, state store 사용 state backend + checkpoint, more granular
Watermark 지원 (event-time 처리 가능) 정교한 watermark, allowed lateness 등 풍부
Source 지원 Kafka, Files, JDBC 등 Kafka, Debezium, JDBC CDC, etc.
Sink 지원 Iceberg, Delta, Hudi 등 Iceberg, Hudi, S3, JDBC, etc.

 


5) CDC 처리

Flink는 CDC에 특화되어서 upsert-enabled sink를 사용하면 자동으로 RowKind.UPDATE_BEFORE, UPDATE_AFTER 같은 걸 처리하지만,
Spark에서는 그걸 직접 감지하고 처리해야 함

 

  • Spark Streaming은 기본적으로 append or complete output mode만 지원
  • CDC는 insert, update, delete가 섞여 있으므로,
  • 결국 MERGE INTO를 써서 기존 테이블과 병합해야 upsert / delete 처리가 가능

 

spark.sql("""
MERGE INTO my_catalog.db.target_table t
USING my_catalog.db.staging_table s
ON t.id = s.id
WHEN MATCHED AND s.op = 'd' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.name = s.name
WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id, s.name)
""")

6) 언제 sparkstreaming을 사용할까?

  • Spark는 기존 배치 파이프라인과 쉽게 통합됨, 기존 spark에 익숙한 조직이나 팀원이 있을 때
  • 실시간 Latency가 그렇게 중요하지 않을 때

사실 세밀한 설정이나, 처리 제어의 완벽한 보장 그리고 자동CDC, record 단위의 실시간 스트리밍을 감안했을 때 Flink가 성능적으로는 무조건 나음


3. Flink 예제 코드

1) Flink SQL

-- 1) CDC Source 테이블 생성 (MySQL 예제)
CREATE TABLE mysql_binlog_source (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql-host',
  'port' = '3306',
  'username' = 'user',
  'password' = 'password',
  'database-name' = 'mydb',
  'table-name' = 'my_table'
);

-- 2) Sink 테이블 (예: Kafka, Iceberg 등)
CREATE TABLE kafka_sink (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'cdc_output_topic',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

-- 3) CDC 데이터 바로 전송
INSERT INTO kafka_sink
SELECT id, name FROM mysql_binlog_source;

2) Datastream API

import com.ververica.cdc.connectors.mysql.MySQLSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.flink.TableLoader;

import java.util.Properties;

public class FlinkCDCToIceberg {
    public static void main(String[] args) throws Exception {
        // 1. 실행 환경 생성
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. MySQL CDC Source 생성 (JSON 형태 수신)
        MySQLSource<String> source = MySQLSource.<String>builder()
            .hostname("mysql-host")
            .port(3306)
            .databaseList("mydb")
            .tableList("mydb.my_table")
            .username("user")
            .password("password")
            .deserializer(new JsonDebeziumDeserializationSchema()) // CDC 이벤트 JSON 수신
            .startupOptions(StartupOptions.latest())
            .build();

        // 3. CDC 스트림 생성
        DataStreamSource<String> cdcStream = env.fromSource(
            source,
            WatermarkStrategy.noWatermarks(),
            "MySQL CDC Source"
        );

        // 4. JSON -> POJO 변환 (예시 User POJO)
        DataStream<User> userStream = cdcStream
            .map(json -> parseJsonToUser(json))
            .returns(User.class);

        // 5. Iceberg 테이블 로더 설정 (S3 경로, 카탈로그 등)
        TableLoader tableLoader = TableLoader.fromHadoopTable("s3://bucket/iceberg/mydb/my_table");

        // 6. Iceberg Sink 생성
        IcebergSink<User> icebergSink = IcebergSink
            .forRow(userStream)
            .tableLoader(tableLoader)
            .overwrite(false)
            .build();

        // 7. Iceberg Sink 추가
        userStream.sinkTo(icebergSink);

        // 8. 실행 시작
        env.execute("Flink CDC to Iceberg");
    }

    // JSON String -> User 객체 변환 (간단 예시)
    private static User parseJsonToUser(String json) {
        // Jackson, Gson 등 사용하여 파싱
        // 예: {"id":1,"name":"Alice","op":"c"}
        // op에 따라 insert/update/delete 처리 가능하게 POJO 확장 필요
        // 간단하게 여기서는 insert/update로 처리한다고 가정
        return new User(...);
    }

    // User POJO 예시
    public static class User {
        public int id;
        public String name;
        public String op; // c=insert, u=update, d=delete 등

        // 생성자, getter/setter 필요
    }
}
728x90

댓글