본문 바로가기
BigData/Spark & Spark Tuning

[Spark] Spark Streaming 이벤트 시간과 상태 기반 처리 정리

by 스파이디웹 2025. 2. 10.
728x90

스트리밍 데이터를 다룰 때 꼭 필요한 개념인 이벤트 시간 처리와 상태 기반 처리에 대해서 정리하겠습니다.

 


1. 이벤트 시간 처리

  • DStream API는 이벤트 시간과 관련된 처리 정보를 제공하지 않음
  • 이벤트 시간:
    • 이벤트 시간은 데이터에 기록되어 있는 시간
    • 대부분의 경우 이벤트가 실제로 일어난 시간을 의미
    • 이벤트를 다른 이벤트와 비교하는 더 강력한 방법을 제공하기 때문에 사용해야 함
    • 이때 지연되거나 무작위로 도착하는 이벤트를 해결해야 한다는 문제점이 있음
    • 스트림 처리 시스템은 지연되거나 무작위로 도착한 데이터를 반드시 제어할 수 있어야 함
  • 처리 시간:
    • 처리 시간은 스트림 처리 시스템이 데이터를 실제로 수신한 시간
    • 세부 구현과 관련된 내용이므로 이벤트 시간보다 덜 중요
    • 이벤트 시간처럼 외부 시스템에서 제공하는 것이 아니라 스트리밍 시스템이 제공하는 속성이므로 순서가 뒤섞이지 않음
    • 거리 및 네트워크 상태에 따라 달라지는 시간이므로 신뢰도가 낮은 데이터이므로 사용하기 어려움

2. 상태 기반 처리

  • 오랜 시간에 걸쳐 중간 처리 정보(상태)를 사용하거나 갱신하는 경우(마이크로 배치 또는 레코드 단위 처리)에만 필요
  • 이벤트 시간을 사용하거나 키에 대한 집계를 사용하는 상황에서 일어나지만, 집계키가 반드시 이벤트 시간과 연관성을 가져야 하는 것은 아님
  • 상태 기반 연산에 필요한 복잡한 처리를 대신함
    • 상태 기반 연산에 필요한 중간 상태 정보를 상태 저장소(인메모리 상태 저장소)에 저장 함
    • 인메모리 상태 저장소는 중간 상태를 체크포인트 디렉터리에 저장해 내고장성을 보장

3. 임의적인 상태 기반 처리

  • 상태의 유형, 갱신 방법 그리고 제거 시점에 따라(명시적으로 또는 타임아웃 조건으로) 세밀한 제어가 필요할 때 사용하는 처리(사용자 정의 상태 기반 처리라고도 함)
    • ex) 전자 상거래 사이트에서 사용자 세션 정보를 기록
    • 웹 앱에서 사용자 세션별로 오류가 5번 발생했을 경우 오류 보고 → 카운트 기반 윈도우를 사용
    • 중복 이벤트를 계속해서 제거해야 하는 경우 → 과거의 모든 레코드를 추적해서 중복 데이터 제거
  • 상태 기반 처리
    • 특정 키의 개수를 기반으로 윈도우 생성하기
    • 특정 시간 범위 안에 일정 개수 이상의 이벤트가 있는 경우 알림 발생시키기
    • 결정되지 않은 시간 동안 사용자 세션을 유지하고 향후 분석을 위해 세션 저장하기
  • 임의적인 상태 기반 처리를 수행하면 결과적으로 2가지 유형을 만남
    • 데이터의 각 그룹에 맵 연산을 수행하고 각 그룹에서 최대 1개의 로우를 만들어 냄(mapGroupWithState API)
    • 데이터의 각 그룹에 맵 연산을 수행하고 각 그룹에서 1개 이상의 로우를 만들어 냄(flatMapGroupsWithState API)

1) 타임 아웃

  • 중간 상태를 제거하기 전에 기다려야 하는 시간
  • 각 키별로 그룹이 존재한다고 했을 때 타임아웃은 전체 그룹에 대한 전역 파라미터로 동작
  • 처리 시간(GroupStateTimeout.ProcessingTimeTimeout)이나 이벤트 시간(GroupStateTimeout.EventTimeTimeout) 중 하나
  • 처리 시간 기반의 타임아웃을 사용할 때는 GroupState.setTimeoutDuration으로 주기를 설정할 수 있음
  • 처리 시간 기반의 타임아웃은 시스템 시간의 변화에 영향을 받으므로 시간대 변경과 시간 지연을 잘 고려해야 함
  • 이벤트 시간 기준의 타임아웃을 사용할 때도 쿼리에 이벤트 시간 워터마크를 반드시 지정해야 함(워터마크가 타임스탬프를 초과했을 때 발생)

2) 출력 모드

  • mapGroupsWithState - update 모드
  • flatMapGroupsWithState - append와 update 모드
  • append모드는 타임아웃 이후에 결과 셋에서 데이터를 볼 수 있음(워터마크를 지나야 함)

3) mapGroupsWithState

  • 상태 기반 처리, 갱신된 데이터셋을 입력으로 받고 값을 특정 키로 분배하는 사용자 정의 집계 함수와 유사
    • 세 가지 클래스 정의: 입력 클래스, 상태 클래스, 출력 클래스(선택적으로 정의)
    • 키, 이벤트 이터레이터 그리고 이전 상태를 기반으로 상태를 갱신하는 함수
    • 타임아웃 파라미터
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._

object MapGroupsWithStateExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("MapGroupsWithStateExample")
      .master("local[2]")
      .getOrCreate()

    // 🔹 스키마 정의
    val schema = new StructType()
      .add("id", IntegerType)
      .add("value", IntegerType)

    // 🔹 입력 스트리밍 데이터 읽기
    val inputStream = spark.readStream
      .schema(schema)
      .csv("C:/path/to/input")

    // 🔹 상태를 유지할 그룹별 함수 정의
    def updateState(key: Int, values: Iterator[Int], state: GroupState[Int]): Iterator[(Int, Int)] = {
      val currentState = state.getOption.getOrElse(0)  // 이전 상태를 가져옴
      val newValue = values.sum + currentState        // 새로운 값으로 상태를 업데이트
      state.update(newValue)                          // 상태 업데이트
      Iterator((key, newValue))                       // 결과를 반환
    }

    // 🔹 mapGroupsWithState 사용
    val result = inputStream
      .groupByKey(row => row.getAs[Int]("id"))
      .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateState)

    // 🔹 콘솔로 출력
    val query = result.writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

4) flatMapGroupsWithState

  • 상태 기반 처리, 단일 키의 출력 결과가 여러 개 만들어지는 것을 제외하면 mapGroupsWithState와 매우 유사
    • 세 가지 클래스 정의: 입력 클래스, 상태 클래스, 출력 클래스(선택적으로 정의)
    • 키, 이벤트 이터레이터 그리고 이전 상태를 기반으로 상태를 갱신하는 함수
    • 타임아웃 파라미터
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object FlatMapGroupsWithStateExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("FlatMapGroupsWithStateExample")
      .master("local[2]")
      .getOrCreate()

    // 🔹 스키마 정의
    val schema = new StructType()
      .add("id", IntegerType)
      .add("value", IntegerType)

    // 🔹 입력 스트리밍 데이터 읽기
    val inputStream = spark.readStream
      .schema(schema)
      .csv("C:/path/to/input")

    // 🔹 상태를 유지할 그룹별 함수 정의
    def updateState(key: Int, values: Iterator[Int], state: GroupState[Int]): Iterator[(Int, Int)] = {
      val currentState = state.getOption.getOrElse(0)  // 이전 상태를 가져옴
      val newValue = values.sum + currentState        // 새로운 값으로 상태를 업데이트
      state.update(newValue)                          // 상태 업데이트
      Iterator((key, newValue), (key, newValue * 2))  // 여러 개의 레코드 반환
    }

    // 🔹 flatMapGroupsWithState 사용
    val result = inputStream
      .groupByKey(row => row.getAs[Int]("id"))
      .flatMapGroupsWithState(GroupStateTimeout.NoTimeout)(updateState)

    // 🔹 콘솔로 출력
    val query = result.writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

4. 워터 마크로 지연 데이터 제어하기

  • 얼마나 늦게 도착한 데이터까지 받아들일지 기준을 정할 때 사용
  • 특정 시간 이후에 처리에서 제외할 이벤트나 이벤트 집합에 대한 시간 기준
  • structured streaming에만 있는 특징
  • ex) 10분으로 지정하면 이전 이벤트보다 10분 전에 일어난 모든 이벤트를 무시함(10분 이내의 이벤트만 받아들임)
  • 워터마크를 지정하지 않으면 전체 윈도우의 데이터를 영원히 유지하면서 결과를 갱신함
import org.apache.spark.sql.functions.{window, col}

withEventTime.withWatermark("event_time", "5 hours")
			 .groupBy(window(col("event_Time"), "10 minutes", "5 minutes"))
             .count()
             .writeStream
             .queryName("events_per_window")
             .format("memory")
             .outputMode("complete")
             .start()

5. 스트림에서 중복 데이터 제거

  • structured streaming은 최소 한번(at-least-once)처리하는 방식을 제공하는 메시지 시스템을 쉽게 사용할 수 있음
    • 처리시에 키를 기준으로 중복을 제거해 정확히 한번 처리(exactly once)방식을 지원함
    • 스파크는 데이터 중복을 제거하기 위해 사용자가 지정한 키를 유지하면서 중복 여부를 확인함
  • 중복 이라고 판단되는 조건을 명시하여 제거하는 것이 필요
// 사용자와 타임스탬프가 같은 데이터를 중복으로 판단
import org.apache.spark.sql.functions.expr

withEventTime.withWatermart("event_time", "5 seconds")
			 .dropDuplicates("User", "event_time")
             .groupBy("User")
             .count()
             .writeStream()
             .queryName("deduplicated")
             .format("memory")
             .outputMode("complete")
             .start()

 


 

728x90

댓글