728x90
이번 포스트에는 spark streaming 고수준 API인 structured streaming에 대해서 정리해보겠습니다.
1. Structured Streaming 개요 및 특징
- Structured streaming은 spark SQL 엔진 기반의 스트림 처리 프레임워크(spark의 structured API인 Dataframe, Dataset 그리고 SQL를 사용함)
- 스트리밍 연산은 배치 연산과 동일하게 표현함, 사용자가 스트림 처리용 코드와 목적지를 정의하면 structured streaming 엔진에서 신규 데이터에 대한 증분 및 연속형 쿼리를 실행하고, 코드 생성, 쿼리 최적화 등의 기능을 지원하는 카탈리스트 엔진을 사용해 연산에 대한 논리적 명령을 처리함
- 종합적이며, 정확히 한 번 처리 방식(exactly once)뿐만 아니라 check pointing과 WAL(write-ahead log)을 이용한 내고장성을 제공
내고장성이란?
내고장성(Fault-Tolerant)이란, 고장이나 결함이 발생해도 정상적인 동작을 유지하는 시스템을 의미
- 스트림 데이터를 데이터가 계속해서 추가되는 테이블처럼 다루는것이 핵심 아이디어
- 배치처리나 스트림 처리와 관련된 쿼리 구문을 변경하지 않고, 내부적으로 사용자의 쿼리를 어떻게 증분 처리할지 자동으로 파악함 → 내고장성을 보장하면서 신규 데이터가 유입될 때마다 효율적으로 처리 결과를 갱신
연속형 애플리케이션
스파크의 나머지 기능과 구조적 스트리밍을 통합해 연속형 애플리케이션을 구축할 수 있으며,
스트리밍 작업, 배치 작업, 스트리밍과 오프라인 데이터의 조인 그리고 대화형 비정형 쿼리의 실행을 조합해 데이터에 실시간으로 반응하는 통합 빅데이터 처리 애플리케이션을 의미함
2. Structured Streaming 핵심 개념
- transformation과 action을 가지고 있음
- input source
- apache kafka
- HDFS나 S3등 분산 파일 시스템의 파일(스파크는 디렉터리의 신규 파일을 계속해서 읽음)
- 테스트용 소켓 소스
- sink
- 스트림의 결과를 저장할 목적지를 명시
- 싱크과 실행 엔진은 데이터 처리의 진행 상황을 신뢰도 있고 정확하게 추적하는 역할을 함
- output sink
- apache kafka
- 거의 모든 파일 포멧
- 출력 레코드에 임의 연산을 실행하는 foreach sink
- 테스트용 콘솔 sink
- 디버깅용 메모리 sink
- 출력 모드
- append: 싱크에 신규 레코드만 추가
- update: 변경 대상 레코드 자체를 갱신
- complete: 전체 출력 내용 rewrite
- 트리거
- 출력 모드(데이터 출력 방식 정의) ↔ 트리거(데이터 출력 시점 정의)
- structured streaming에서 언제 신규 데이터를 확인하고 결과를 갱신할지 정의
- ex) structured streaming은 기본적으로 마지막 입력 데이터를 처리한 직후에 신규 입력 데이터를 조회해 최단 시간 내에 새로운 처리 결과를 만들어내지만, 이런 동작 방식 때문에 파일 싱크를 사용하는 경우 작은 크기의 파일이 여러 개 생길 수 있음
- 이벤트 시간 처리
- 이벤트 시간 기준의 처리: 무작위로 도착한 레코드 내부에 기록된 타임스탬프 기준으로 처리
- 스파크는 데이터가 유입된 시간이 아니라 데이터 생성 시간을 기준으로 처리함, 따라서 데이터가 늦게 업로드되거나 네트워크 지연으로 데이터의 순서가 뒤섞인 채 시스템으로 들어와도 처리할 수 있음
- 시스템은 입력 테이블로 인식하므로 인식하므로 이벤트 시간은 테이블에 있는 하나의 컬럼일 뿐
- 표준 SQL이 내부적으로 이벤트 시간 필드를 인식하면 쿼리 실행 최적화나 타임 윈도우에서 상태 정보의 제거 시점을 결정하는 등 특별한 작업을 수행할 수 있고, 이럴 때 워터마크를 사용
- 이벤트 시간: 데이터에 기록된 시간 필드를 의미
- 처리 시간 기준 처리: 스트리밍 application에 레코드가 도착한 시간을 기반으로 처리하는 방식을 의미
- 워터마크: 시간 제한을 설정할 수 있는 스트리밍 시스템의 기능
- 늦게 들어온 이벤트를 어디까지 처리할지 시간을 제한 함
- 이벤트 시간 처리를 지원하는 여러 시스템은 과거 데이터의 보관 주기를 제한하기 위해 워터마크를 사용
- 이벤트 시간 기준의 처리: 무작위로 도착한 레코드 내부에 기록된 타임스탬프 기준으로 처리
3. Structured Streaming option 및 hands on
옵션에 관해서는 아래 링크를 참조
https://spidyweb.tistory.com/577
실제 로컬에서 테스트해보는 hands on 게시글은 아래 링크를 참조
https://spidyweb.tistory.com/578
4. Stream Transformation
- 정적 Dataframe의 트랜스포메이션을 대부분 포함함
- 모든 함수와 개별 컬럼 처리도 지원함
1) select & where(filter)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("StreamingWhereSelectExample")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 스트리밍 데이터 (예제: JSON 파일에서 읽기)
val inputDf = spark.readStream
.format("json")
.option("path", "/tmp/streaming_input") // 감시할 디렉토리
.option("maxFilesPerTrigger", 1)
.load()
// 특정 조건(where) + 필요한 컬럼 선택(select)
val filteredDf = inputDf.where($"age" > 30).select("name", "age")
.writeStream
.queryName("select_filter")
.format("console")
.outputMode("append")
.start()
filteredDf.awaitTermination()
2) aggregation
- 데이터셋의 원시 컬럼에 대한 집계뿐만 아니라 스트림 처리에 특화된 이벤트 시간 컬럼 지정, 워터마크 그리고 윈도우 처리를 지원 함
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("StreamingWhereSelectExample")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 스트리밍 데이터 (예제: JSON 파일에서 읽기)
val inputDf = spark.readStream
.format("json")
.option("path", "/tmp/streaming_input") // 감시할 디렉토리
.option("maxFilesPerTrigger", 1)
.load()
val aggregatedDf = inputDf.groupBy("age").count().writeStream
.format("console")
.outputMode("complete") // 집계 결과는 complete 모드 필수
.start()
aggregatedDf.awaitTermination()
3) join
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// 첫 번째 스트리밍 데이터 (User 정보)
val usersDf = spark.readStream
.format("json")
.option("path", "/tmp/streaming_users")
.load()
// 두 번째 스트리밍 데이터 (Transaction 정보)
val transactionsDf = spark.readStream
.format("json")
.option("path", "/tmp/streaming_transactions")
.load()
// `id` 컬럼을 기준으로 조인
val joinedDf = usersDf.join(transactionsDf, "id").writeStream
.format("console")
.outputMode("append")
.start()
joinedDf.awaitTermination()
5. Input & Output
1) 데이터를 읽고 쓰는 장소(source & sink)
- file source & sink
- 가장 간단한 소스, 동작 방식은 쉽게 추측하고 이해할 수 있음
- 실전에서는 parquet, text, JSON, CSV, Avro를 자주 사용함
- 정적 파일 source와의 유일한 차이점은 trigger 시 읽을 파일 수를 결정할 수 있다는 것(maxFilesPerTrigger)
- 모든 파일은 스트리밍 작업에서 바라보는 input directory에 원자적으로 추가되어야 함 → 그렇지 않으면 스파크에서 파일을 일부분만 처리
- 로컬 파일 시스템이나 HDFS에서는 부분 기록된 파일을 볼 수 있고, 외부 directory에 파일을 완전히 기록한 후 input directory로 옮겨야 함(AWS S3는 완전히 기록된 객체만 보임)
- kafka source & sink
- kafka는 pub/sub 방식의 분산형 시스템이고, 메시지 큐 방식처럼 레코드의 스트림을 publish(데이터를 읽는 동작)하고 subscribe(데이터를 쓰는 동작)하는 방식으로 사용함 (kafka입장에서 spark streaming은 consumer 이자 producer가 될 수 있음)
- 발행된 메시지는 내고장성을 보장하는 저장소에 저장되므로 분산형 버퍼로 생각할 수 있음
- record stream을 topic으로 불리는 카테고리에 저장함
- kafka의 각 record는 키, 값 그리고 타임스탬프로 구성됨
- topic은 순서를 바꿀 수 없는 레코드로 구성되며 레코드의 위치를 offset이라고 부름
- kafka는 pub/sub 방식의 분산형 시스템이고, 메시지 큐 방식처럼 레코드의 스트림을 publish(데이터를 읽는 동작)하고 subscribe(데이터를 쓰는 동작)하는 방식으로 사용함 (kafka입장에서 spark streaming은 consumer 이자 producer가 될 수 있음)
2) kafka source에서 메시지 읽기
- assign: 토픽뿐만 아니라 읽으려는 파티션까지 세밀하게 지정하는 옵션
- subscibe: 토픽 목록을 지정해 여러 토픽을 구독하는 옵션
- subscribePattern: 패턴을 지정해 여러 토픽을 구독하는 옵션
카프카 서비스에 접속하기 위해 option("kafka.bootstrap.servers") 값도 지정해야 함
이외의 옵션
- startingOffsets 및 endingOffsets
- 쿼리를 시작할 때 읽을 지점
- 가장 작은 offset부터 읽는 earliest나 가장 큰 오프셋 부터 읽는 latest 중 하나를 지정
- 또는 각 TopicPartition에 대한 시작 offset을 명시한 JSON 문자열을 사용해 지정
- 새로운 스트리밍 쿼리가 시작될 때만 적용되며, 다시 시작한 쿼리에서는 쿼리가 남긴 오프셋을 사용
- 쿼리 실행 중에 새롭게 발견한 파티션은 earliest 방식으로 읽음
- failOnDataLoss
- 데이터 유실(토픽이 삭제되거나 오프셋이 범위를 벗어났을 때)이 일어났을 때 쿼리를 중단할 것인지 지정
- 잘못된 정보를 줄 수 있으므로 원하는대로 동작하지 않는 경우 비활성화 가능(기본값 true)
- maxOffsetsPerTrigger
- 특정 트치거 시점에 읽을 오프셋의 전체 개수
kafka source의 schema
- 키: binary
- 값: binary
- 토픽: string
- 패턴: int
- 오프셋: long
- 타임스탬프: long
* 카프카에서 데이터를 읽거나 쓸 때는 JSON이나 Avro를 자주 사용함
3) kafka sink에 메세지 쓰기
kafka로 메시지를 publish하는 쿼리와 subscribe하는 쿼리는 몇 가지 파라미터를 제외하고 매우 비슷함
foreach sink
- Dataset API에 있는 foreachPartitions와 유사함
- 각 파티션에서 임의의 연산을 병렬로 수행
- 내고장성을 반드시 고려해야 함
- scala와 java에서는 foreach sink를 사용할 수 있음(ForeachWriter 인터페이스 구현)
- writer는 UDF나 Dataset 맵 함수처럼 반드시 Serializable 인터페이스를 구현해야 함
- 세 가지 메서드(open, process, close)는 각 executor에서 실행됨
- writer는 연결을 맺거나 트랜잭션을 시작하는 등의 모든 초기화 작업을 반드시 open 메서드에서 수행해야 함
흔히 발생할 수 있는 오류는 open 메서드가 아닌 부분에서 초기화를 수행해 executor가 아닌 driver에서 초기화되는 경우
테스트용 소스와 싱크
디버깅 및 테스트용 source 와 sink
- socket source
- 소켓 소스를 사용하면 TCP 소켓을 통해 스트림 데이터를 전송할 수 있음
- 데이터를 읽기 위한 호스트와 포트 지정이 필요
- 소켓이 드라이버에 있어 종단 간 내고장성을 보장 할 수 없으므로 운영 단계에서 사용하면 안 됨
- console sink
- 스트리밍 쿼리의 처리 결과를 콘솔로 출력할 때 사용
- 디버깅에는 유용, 내고장성은 지원하지 않음
- 스트리밍 쿼리의 일부 결과를 간단하게 콘솔로 출력하며, append와 complete 출력 모드를 지원
- memory sink
- 스트리밍 시스템을 테스트하는 데 사용하는 소스
- 콘솔 싱크와 유사하지만 콘솔에 출력하는 대신 드라이버에 데이터를 모은 후 대화형 쿼리가 가능한 메모리 테이블에 저장함
- 내고장성을 제공하지 않으므로 운영환경에서 사용하면 안 됨
- append와 complete 출력 모드를 지원
4) 데이터 출력 방법(outputMode)
- append mode
- 기본 동작 방식
- 새로운 로우가 결과 테이블에 추가되면 사용자가 명시한 트리거에 맞춰 싱크로 출력
- 내고장성을 보장하는 싱크를 사용한다는 가정 하에 모든 로우를 한 번만 출력
- 이벤트 시간과 워터마크를 append 모드와 함께 사용하면 최종 결과만 싱크로 출력
- complete mode
- 결과 테이블의 전체 상태를 싱크로 출력
- 모든 데이터가 계쏙해서 변경될 수 있는 일부 상태 기반 데이터를 다룰 때 유용(흔히 말하는 Master성 테이블)
- 사용 중인 싱크가 저수준 업데이트를 지원하지 않을 때에 유용
- update mode
- 이전 출력 결과에서 변경된 로우만 싱크로 출력한다는 점을 제외하면 complete 모드와 유사
- 이 모드를 지원하는 sink는 반드시 저수준 업데이트를 지원해야 함
- 쿼리에서 집계 연산을 하지 않으면 append 모드와 동일
5) 데이터 출력 시점(trigger)
- 출력하는 시점을 제어할 때 트리거를 설정
- structured streaming에서는 직전 트리거가 처리를 마치자마자 즉시 데이터를 출력
- 너무 많은 수정이 발생한다면 데이터를 출력할 때 트리거를 사용
- 싱크에 큰 분하가 발생하는 현상을 방지하거나 출력 파일의 크기를 제어하는 용도로 사용
- 처리 기간 기반의 주기형 트리거(periodic trigger)와 처리 단계를 수동으로 한 번만 실행할 수 있는 일회성 트리거(once trigger)를 제공함
처리 시간 기반 트리거
처리 주기를 문자열로 단순하게 지정(scala의 Duration이나 java의 TimeUnit도 사용 가능)
import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
.format("console").outputMode("complete").start()
일회성 트리거
- 개발 중에는 트리거에서 한 번에 처리할 수 있는 수준의 데이터로 application test가 가능
- 운영 환경에서는 자주 실행되지 않는 job을 수동으로 실행할 때 일회성 트리거를 사용
import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.Once())
.format("console").outputMode("complete").start()
최근에 추가된 소스를 확인하려면 아래 링크를 참조
https://bit.ly/2Ncpko2
728x90
'BigData > Spark & Spark Tuning' 카테고리의 다른 글
[Spark] Spark Streaming 이벤트 시간과 상태 기반 처리 정리 (0) | 2025.02.10 |
---|---|
[Spark] Spark streaming readStream, writeStream format, option, mode 및 config 정리 (0) | 2025.02.07 |
[Spark] EMR을 구성하는 instance는 큰 spec이 유리할까? 작은 spec이 유리할까? (0) | 2025.01.14 |
[Spark] 프로그래밍 언어 별 Spark 성능, 속도 차이 (UDF와 직렬화) (0) | 2025.01.10 |
[Spark] YARN vs k8s vs mesos 스파크 리소스 및 작업 스케줄링 (0) | 2025.01.07 |
댓글