728x90
스트리밍 데이터를 다룰 때 꼭 필요한 개념인 이벤트 시간 처리와 상태 기반 처리에 대해서 정리하겠습니다.
1. 이벤트 시간 처리
- DStream API는 이벤트 시간과 관련된 처리 정보를 제공하지 않음
- 이벤트 시간:
- 이벤트 시간은 데이터에 기록되어 있는 시간
- 대부분의 경우 이벤트가 실제로 일어난 시간을 의미
- 이벤트를 다른 이벤트와 비교하는 더 강력한 방법을 제공하기 때문에 사용해야 함
- 이때 지연되거나 무작위로 도착하는 이벤트를 해결해야 한다는 문제점이 있음
- 스트림 처리 시스템은 지연되거나 무작위로 도착한 데이터를 반드시 제어할 수 있어야 함
- 처리 시간:
- 처리 시간은 스트림 처리 시스템이 데이터를 실제로 수신한 시간
- 세부 구현과 관련된 내용이므로 이벤트 시간보다 덜 중요
- 이벤트 시간처럼 외부 시스템에서 제공하는 것이 아니라 스트리밍 시스템이 제공하는 속성이므로 순서가 뒤섞이지 않음
- 거리 및 네트워크 상태에 따라 달라지는 시간이므로 신뢰도가 낮은 데이터이므로 사용하기 어려움
2. 상태 기반 처리
- 오랜 시간에 걸쳐 중간 처리 정보(상태)를 사용하거나 갱신하는 경우(마이크로 배치 또는 레코드 단위 처리)에만 필요
- 이벤트 시간을 사용하거나 키에 대한 집계를 사용하는 상황에서 일어나지만, 집계키가 반드시 이벤트 시간과 연관성을 가져야 하는 것은 아님
- 상태 기반 연산에 필요한 복잡한 처리를 대신함
- 상태 기반 연산에 필요한 중간 상태 정보를 상태 저장소(인메모리 상태 저장소)에 저장 함
- 인메모리 상태 저장소는 중간 상태를 체크포인트 디렉터리에 저장해 내고장성을 보장
3. 임의적인 상태 기반 처리
- 상태의 유형, 갱신 방법 그리고 제거 시점에 따라(명시적으로 또는 타임아웃 조건으로) 세밀한 제어가 필요할 때 사용하는 처리(사용자 정의 상태 기반 처리라고도 함)
- ex) 전자 상거래 사이트에서 사용자 세션 정보를 기록
- 웹 앱에서 사용자 세션별로 오류가 5번 발생했을 경우 오류 보고 → 카운트 기반 윈도우를 사용
- 중복 이벤트를 계속해서 제거해야 하는 경우 → 과거의 모든 레코드를 추적해서 중복 데이터 제거
- 상태 기반 처리
- 특정 키의 개수를 기반으로 윈도우 생성하기
- 특정 시간 범위 안에 일정 개수 이상의 이벤트가 있는 경우 알림 발생시키기
- 결정되지 않은 시간 동안 사용자 세션을 유지하고 향후 분석을 위해 세션 저장하기
- 임의적인 상태 기반 처리를 수행하면 결과적으로 2가지 유형을 만남
- 데이터의 각 그룹에 맵 연산을 수행하고 각 그룹에서 최대 1개의 로우를 만들어 냄(mapGroupWithState API)
- 데이터의 각 그룹에 맵 연산을 수행하고 각 그룹에서 1개 이상의 로우를 만들어 냄(flatMapGroupsWithState API)
1) 타임 아웃
- 중간 상태를 제거하기 전에 기다려야 하는 시간
- 각 키별로 그룹이 존재한다고 했을 때 타임아웃은 전체 그룹에 대한 전역 파라미터로 동작
- 처리 시간(GroupStateTimeout.ProcessingTimeTimeout)이나 이벤트 시간(GroupStateTimeout.EventTimeTimeout) 중 하나
- 처리 시간 기반의 타임아웃을 사용할 때는 GroupState.setTimeoutDuration으로 주기를 설정할 수 있음
- 처리 시간 기반의 타임아웃은 시스템 시간의 변화에 영향을 받으므로 시간대 변경과 시간 지연을 잘 고려해야 함
- 이벤트 시간 기준의 타임아웃을 사용할 때도 쿼리에 이벤트 시간 워터마크를 반드시 지정해야 함(워터마크가 타임스탬프를 초과했을 때 발생)
2) 출력 모드
- mapGroupsWithState - update 모드
- flatMapGroupsWithState - append와 update 모드
- append모드는 타임아웃 이후에 결과 셋에서 데이터를 볼 수 있음(워터마크를 지나야 함)
3) mapGroupsWithState
- 상태 기반 처리, 갱신된 데이터셋을 입력으로 받고 값을 특정 키로 분배하는 사용자 정의 집계 함수와 유사
- 세 가지 클래스 정의: 입력 클래스, 상태 클래스, 출력 클래스(선택적으로 정의)
- 키, 이벤트 이터레이터 그리고 이전 상태를 기반으로 상태를 갱신하는 함수
- 타임아웃 파라미터
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._
object MapGroupsWithStateExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("MapGroupsWithStateExample")
.master("local[2]")
.getOrCreate()
// 🔹 스키마 정의
val schema = new StructType()
.add("id", IntegerType)
.add("value", IntegerType)
// 🔹 입력 스트리밍 데이터 읽기
val inputStream = spark.readStream
.schema(schema)
.csv("C:/path/to/input")
// 🔹 상태를 유지할 그룹별 함수 정의
def updateState(key: Int, values: Iterator[Int], state: GroupState[Int]): Iterator[(Int, Int)] = {
val currentState = state.getOption.getOrElse(0) // 이전 상태를 가져옴
val newValue = values.sum + currentState // 새로운 값으로 상태를 업데이트
state.update(newValue) // 상태 업데이트
Iterator((key, newValue)) // 결과를 반환
}
// 🔹 mapGroupsWithState 사용
val result = inputStream
.groupByKey(row => row.getAs[Int]("id"))
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateState)
// 🔹 콘솔로 출력
val query = result.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
}
}
4) flatMapGroupsWithState
- 상태 기반 처리, 단일 키의 출력 결과가 여러 개 만들어지는 것을 제외하면 mapGroupsWithState와 매우 유사
- 세 가지 클래스 정의: 입력 클래스, 상태 클래스, 출력 클래스(선택적으로 정의)
- 키, 이벤트 이터레이터 그리고 이전 상태를 기반으로 상태를 갱신하는 함수
- 타임아웃 파라미터
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object FlatMapGroupsWithStateExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("FlatMapGroupsWithStateExample")
.master("local[2]")
.getOrCreate()
// 🔹 스키마 정의
val schema = new StructType()
.add("id", IntegerType)
.add("value", IntegerType)
// 🔹 입력 스트리밍 데이터 읽기
val inputStream = spark.readStream
.schema(schema)
.csv("C:/path/to/input")
// 🔹 상태를 유지할 그룹별 함수 정의
def updateState(key: Int, values: Iterator[Int], state: GroupState[Int]): Iterator[(Int, Int)] = {
val currentState = state.getOption.getOrElse(0) // 이전 상태를 가져옴
val newValue = values.sum + currentState // 새로운 값으로 상태를 업데이트
state.update(newValue) // 상태 업데이트
Iterator((key, newValue), (key, newValue * 2)) // 여러 개의 레코드 반환
}
// 🔹 flatMapGroupsWithState 사용
val result = inputStream
.groupByKey(row => row.getAs[Int]("id"))
.flatMapGroupsWithState(GroupStateTimeout.NoTimeout)(updateState)
// 🔹 콘솔로 출력
val query = result.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
}
}
4. 워터 마크로 지연 데이터 제어하기
- 얼마나 늦게 도착한 데이터까지 받아들일지 기준을 정할 때 사용
- 특정 시간 이후에 처리에서 제외할 이벤트나 이벤트 집합에 대한 시간 기준
- structured streaming에만 있는 특징
- ex) 10분으로 지정하면 이전 이벤트보다 10분 전에 일어난 모든 이벤트를 무시함(10분 이내의 이벤트만 받아들임)
- 워터마크를 지정하지 않으면 전체 윈도우의 데이터를 영원히 유지하면서 결과를 갱신함
import org.apache.spark.sql.functions.{window, col}
withEventTime.withWatermark("event_time", "5 hours")
.groupBy(window(col("event_Time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
5. 스트림에서 중복 데이터 제거
- structured streaming은 최소 한번(at-least-once)처리하는 방식을 제공하는 메시지 시스템을 쉽게 사용할 수 있음
- 처리시에 키를 기준으로 중복을 제거해 정확히 한번 처리(exactly once)방식을 지원함
- 스파크는 데이터 중복을 제거하기 위해 사용자가 지정한 키를 유지하면서 중복 여부를 확인함
- 중복 이라고 판단되는 조건을 명시하여 제거하는 것이 필요
// 사용자와 타임스탬프가 같은 데이터를 중복으로 판단
import org.apache.spark.sql.functions.expr
withEventTime.withWatermart("event_time", "5 seconds")
.dropDuplicates("User", "event_time")
.groupBy("User")
.count()
.writeStream()
.queryName("deduplicated")
.format("memory")
.outputMode("complete")
.start()
728x90
'BigData > Spark & Spark Tuning' 카테고리의 다른 글
[Spark] Spark Streaming 운영 환경에서의 Structured Streaming (0) | 2025.02.10 |
---|---|
[Spark] Spark streaming readStream, writeStream format, option, mode 및 config 정리 (0) | 2025.02.07 |
[Spark] Spark Streaming, Structured Streaming 기초 정리 (0) | 2025.02.06 |
[Spark] EMR을 구성하는 instance는 큰 spec이 유리할까? 작은 spec이 유리할까? (0) | 2025.01.14 |
[Spark] 프로그래밍 언어 별 Spark 성능, 속도 차이 (UDF와 직렬화) (0) | 2025.01.10 |
댓글