BigData/Spark & Spark Tuning

[Spark] Spark Streaming 운영 환경에서의 Structured Streaming

스파이디웹 2025. 2. 10. 21:08
728x90

이번 포스트에는 spark streaming을 운영할 때 중요한 포인트를 정리 해보겠습니다.


1. 내고장성과 체크포인팅

  • structured streaming application은 단순히 재시작만으로 장애 상황을 극복
    • 스파크 엔진이 자동으로 관리하는 체크포인트와 WAL을 사용하도록 설정해야 함
    • 장애 상황이 발생하면 단순히 application을 다시 시작하여 중간 상탯값을 저장한 체크 포인트 경로를 참조하도록 설정, 이전 상태를 복구하고 중단된 데이터를 처리함
    • 체크포인트 디렉터리나 디렉터리의 파일이 제거된 경우 장애 상황에서 애플리케이션을 정상적으로 실행할 수 없고, 처음부터 다시 스트림 해야 함
val static = spark.read.json("/data/activity-data")
val streaming = spark.readStream.schema(static.schema)
					 .option("maxFilesPerTrigger", 10)
                     .json("/data/activity-data")
                     .groupBy("gt")
                     .count()
                     
val query = streaming.writeStream.outputStream("complete")
					 .option("checkpintLocation", "/some/location") // 체크포인트 장소
                     .queryName("test_stream")
                     .format("memory")
                     .start()
WAL이란?
WAL (Write-Ahead Log)은 데이터를 안정적으로 처리하고 장애 복구를 지원하기 위해 사용하는 메커니즘입니다. WAL은 처리할 데이터를 디스크에 기록한 후에 처리를 시작함으로써 데이터 손실을 방지합니다.


1. 입력 데이터 저장:
- 데이터를 처리하기 전에 먼저 WAL에 기록합니다.
- Spark는 이 데이터를 디스크에 안전하게 저장하여 장애가 발생하더라도 데이터를 복구할 수 있도록 합니다.
2. 데이터 처리:
- 데이터가 WAL에 안전하게 저장된 후에 데이터를 처리합니다.
- 처리 중 장애가 발생하더라도 WAL에서 데이터를 다시 읽어서 재처리할 수 있습니다.
3. 복구 지원:
- Spark Streaming 애플리케이션이 중단되거나 노드에 장애가 발생하면, WAL에 저장된 데이터를 사용해 동일한 상태에서 처리를 재개할 수 있습니다.


spark.streaming.receiver.writeAheadLog.enable=true 를 킴으로써 사용할 수 있음


2. 애플리케이션 변경(수정)

  • 체크포인팅은 현재까지 처리한 스트림과 모든 중간 상태를 저장
  • 스트리밍 애플리케이션을 업데이트할 때 이전 체크포인트 데이터를 고려해야 함

스트리밍 애플리케이션 코드 업데이트하기

  • structured streaming은 application을 다시 시작하기 전에 특정 유형의 application 코드를 변경할 수 있도록 설계되어 있음
    • 사용자 정의 함수는 시그니처가 같은 경우에만 코드를 변경할 수 있음
    • ex) application에 새로운 유형의 데이터가 유입되어 데이터 파싱 함수에서 오류가 발생했을 경우, 다시 컴파일하여 장애가 발생한 부분부터 적용 가능
    • 새로운 컬럼을 추가하거나 사용자 정의 함수를 변경하는 수정은 '중대한 변화'가 아님
    • streaming application에 새로운 집계 키를 추가하거나 쿼리를 완전히 변경하는 등의 업데이트가 발생한다면 스파크는 이전 체크포인트 디렉터리에 저장된 정보에서 새로운 쿼리에 필요한 상태 정보를 만들어 내지 못함(이것이 '중대한 변화')

3. 메트릭과 모니터링

1) 쿼리 상태

  • query.status를 통해 스트림에서 어떤 처리를 하는 지 확인 가능, startStream 메서드에서 반환한 쿼리 객체의 status속성으로 확인할 수 있음
{
    "message" : "Getting offsets from ...",
    "isDataAvailable" : true,
    "isTriggerActive" : true
}

2) 최근 진행 상황

  • 진행 상황 API인 query.recentProgress명령으로 처리율과 배치 주기 등 시간 기반의 정보를 확인 할 수 있음
    • 튜플을 얼마나 처리하고 있는지
    • 소스에서 이벤트가 얼마나 빠르게 들어오는지
    • 스트리밍 쿼리의 진행 상황은 스트림이 사용하는 입력 소스와 출력 싱크의 정보도 함께 제공
  • 출력된 정보는 특정시점(쿼리 진행 상황을 요청한 시점)의 스냅샷
val recentProgress: Array[StreamingQueryProgress] = query.recentProgress

recentProgress.foreach { progress =>
  println(s"Batch ID: ${progress.batchId}")
  println(s"Timestamp: ${progress.timestamp}")
  println(s"Input Rows: ${progress.numInputRows}")
  println(s"Input Rows Per Second: ${progress.inputRowsPerSecond}")
  println(s"Processed Rows Per Second: ${progress.processedRowsPerSecond}")
  println(s"Duration (ms): ${progress.durationMs}")
  println(s"State Operators: ${progress.stateOperators.mkString(",")}")
  println("----------------------------")
}
Array(
  StreamingQueryProgress(
    id = "5ebd8c25-b6b7-4323-b2c9-8d0192b2fcae",
    runId = "2b5c5d64-c6f3-4c94-b91d-9dc6bf52a1e7",
    name = null,
    timestamp = "2025-02-17T12:00:01.000Z",
    batchId = 1,
    durationMs = Map("addBatch" -> 100, "getBatch" -> 5, "queryPlanning" -> 10),
    eventTime = Map("max" -> "2025-02-17T12:00:00.000Z", "avg" -> "2025-02-17T11:59:30.000Z"),
    numInputRows = 1000,
    inputRowsPerSecond = 2000.0, // 유입률
    processedRowsPerSecond = 2500.0, // 처리율
    stateOperators = Array(
      StateOperatorProgress(
        numRowsTotal = 5000,
        numRowsUpdated = 1000,
        memoryUsedBytes = 204800
      )
    )
  ),
  StreamingQueryProgress(
    id = "5ebd8c25-b6b7-4323-b2c9-8d0192b2fcae",
    runId = "2b5c5d64-c6f3-4c94-b91d-9dc6bf52a1e7",
    name = null,
    timestamp = "2025-02-17T12:00:02.000Z",
    batchId = 2,
    durationMs = Map("addBatch" -> 120, "getBatch" -> 6, "queryPlanning" -> 15),
    eventTime = Map("max" -> "2025-02-17T12:00:01.000Z", "avg" -> "2025-02-17T11:59:31.000Z"),
    numInputRows = 1200,
    inputRowsPerSecond = 2400.0,
    processedRowsPerSecond = 2600.0,
    stateOperators = Array(
      StateOperatorProgress(
        numRowsTotal = 6200,
        numRowsUpdated = 1200,
        memoryUsedBytes = 256000
      )
    )
  )
)

3) 스파크 UI

  • 각각의 스트리밍 애플리케이션은 스파크 UI에서 트리거마다 생성된 짧은 잡이 누적된 형태로 나타남
  • 애플리케이션의 메트릭, 쿼리 실행 계획, 테스크 주기, 로그 정보를 제공
  • structured streaming은 DStream API와 다르게 'Streaming' 탭을 사용하지 않음

4. 알림(Alert)

  • 잡이 실패하거나 유입률보다 처리율이 떨어지는 경우 자동으로 알려주는 기능이 필요
  • 프로메테우스 같은 모니터링 시스템에 메트릭을 직접 전송하여 Grafana로 보거나 Slack에 알림을 받아보는 기능과 통합
  • 단순히 로그를 기록하여 splunk 같은 로그 집계 시스템을 사용하여 로그를 분석할 수 있음
  • 쿼리 모니터링 알림 외에도 클러스터와 전체 애플리케이션의 상태를 모니터링하고 알림을 발생시켜야 함

5. streaming listner를 사용한 고급 모니터링

  • StreamingQueryListener 클래스를 이용해 비동기 방식으로 스트리밍 쿼리 정보를 수신한 후 다른 시스템에 자동으로 해당 정보를 전송할 수 있으며 견고한 모니터링 및 알림 메커니즘을 구현할 수 있음
  • StreamingQueryListner 클래스를 상속해 자체 로직을 구현해야 하고, 실행 중인 SparkSession에 등록, sparkSession.streams.addListener() 메서드로 사용자 정의 리스너를 추가할 수 있으며 동작 중인 쿼리가 시작, 종료 또는 진행될 때 관련 정보를 수신할 수 있음
  • 스트리밍 리스너는 사용자 정의 코드에 따라 진행 상황을 갱신하거나 상태를 변경하고 이를 외부 시스템에 전달할 수 있음
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQueryListener
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

// Kafka 설정
val kafkaBootstrapServers = "localhost:9092" // Kafka 브로커 주소
val kafkaTopic = "query-progress-topic"       // Kafka 주제 이름

// Kafka 프로듀서 설정
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", kafkaBootstrapServers)
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

// Kafka 프로듀서 생성
val kafkaProducer = new KafkaProducer[String, String](kafkaProps)

// StreamingQueryListener 구현
class KafkaStreamingQueryListener extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    val message = s"""{
      "event": "QueryStarted",
      "id": "${event.id}",
      "name": "${event.name}",
      "runId": "${event.runId}",
      "timestamp": "${System.currentTimeMillis()}"
    }"""
    kafkaProducer.send(new ProducerRecord[String, String](kafkaTopic, event.id, message))
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val progress = event.progress
    val message = s"""{
      "event": "QueryProgress",
      "id": "${progress.id}",
      "batchId": ${progress.batchId},
      "timestamp": "${progress.timestamp}",
      "numInputRows": ${progress.numInputRows},
      "inputRowsPerSecond": ${progress.inputRowsPerSecond},
      "processedRowsPerSecond": ${progress.processedRowsPerSecond}
    }"""
    kafkaProducer.send(new ProducerRecord[String, String](kafkaTopic, progress.id, message))
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    val terminationReason = event.exception.getOrElse("No Exception")
    val message = s"""{
      "event": "QueryTerminated",
      "id": "${event.id}",
      "runId": "${event.runId}",
      "terminationReason": "${terminationReason}"
    }"""
    kafkaProducer.send(new ProducerRecord[String, String](kafkaTopic, event.id, message))
  }
}

// SparkSession 생성
val spark = SparkSession.builder
  .appName("KafkaStreamingQueryListenerExample")
  .master("local[*]") // 로컬 실행 환경
  .getOrCreate()

// 리스너 등록
val kafkaListener = new KafkaStreamingQueryListener
spark.streams.addListener(kafkaListener)

// 예제 Streaming Query 실행
import spark.implicits._
import org.apache.spark.sql.streaming.Trigger

val inputStream = spark.readStream
  .format("rate") // 예제용으로 rate 소스 사용
  .option("rowsPerSecond", 10) // 초당 10개의 행 생성
  .load()

val query = inputStream
  .selectExpr("value as number", "timestamp")
  .writeStream
  .format("console") // 콘솔에 출력
  .trigger(Trigger.ProcessingTime("2 seconds")) // 2초마다 배치 실행
  .start()

query.awaitTermination() // 스트리밍 쿼리 대기
728x90