본문 바로가기
BigData/Spark & Spark Tuning

[Spark] Spark streaming readStream, writeStream format, option, mode 및 config 정리

by 스파이디웹 2025. 2. 7.
728x90

이번 포스트에는 spark streaming에서 input과 output에서 사용되는 source(format)의 종류와 option을 정리해보겠습니다.

 


1. format

1) readStream.format()

Format 설명
socket TCP 소켓 (netcat 등)에서 텍스트 데이터 수신
kafka Kafka 토픽에서 데이터 수신
file 특정 디렉토리에 추가되는 파일을 실시간으로 읽음
rate 초당 일정한 개수의 행을 생성하는 데이터 소스
rate-micro-batch 마이크로 배치 기반으로 일정한 개수의 행을 생성
delta Delta Lake 테이블을 스트리밍 소스로 사용
parquet Parquet 파일을 스트리밍 소스로 사용
orc ORC 파일을 스트리밍 소스로 사용
json JSON 파일을 스트리밍 소스로 사용
csv CSV 파일을 스트리밍 소스로 사용
avro Avro 파일을 스트리밍 소스로 사용
kinesis AWS kinesis로부터 데이터 수신
pub/sub GCP pub/sub으로부터 데이터 수신
... ...

 

kafka

val ds1 = spark.readStream.format("kafka")
			   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
               .option("subscribe", "topic1")
               .load()
               
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
   .writeStream.format("kafka")
   .option("checkpointLocation", "/to/HDFS-compatible/dir")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .start()

kinesis

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("KinesisStreamExample")
  .getOrCreate()

val df = spark.readStream
  .format("kinesis")
  .option("streamName", "my-kinesis-stream")  // Kinesis 스트림 이름
  .option("region", "us-east-1")  // AWS 리전 설정
  .option("initialPosition", "latest")  // 최신 데이터부터 읽기
  .load()

df.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

pub/sub

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("PubSubStreamExample")
  .getOrCreate()

val df = spark.readStream
  .format("pubsub")
  .option("projectId", "my-gcp-project")  // GCP 프로젝트 ID
  .option("subscription", "my-subscription")  // Pub/Sub 구독 ID
  .load()

df.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

 

2) writeStream.format()

Format 설명
console 콘솔에 결과 출력
memory 출력이 메모리에 in-memory 테이블 형태로 저장
kafka Kafka 토픽으로 데이터 전송
parquet Parquet 파일로 저장
orc ORC 파일로 저장
json JSON 파일로 저장
csv CSV 파일로 저장
avro Avro 파일로 저장
delta Delta Lake에 저장
foreach 사용자 정의 Sink 사용
kinesis kinesis에 데이터 전송
pub/sub pub/sub에 데이터 전송

kafka

val query = df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  // Kafka 브로커 주소
  .option("topic", "my-topic")  // 보낼 Kafka 토픽
  .option("checkpointLocation", "/tmp/kafka-checkpoint")  // 체크포인트 경로 (필수)
  .outputMode("append")  // Kafka는 append 모드만 지원
  .start()

kinesis

val query = df.writeStream
  .format("kinesis")
  .option("streamName", "my-kinesis-stream")  // Kinesis 스트림 이름
  .option("region", "us-east-1")  // AWS 리전 설정
  .option("checkpointLocation", "/tmp/kinesis-checkpoint")  // 체크포인트 경로 (필수)
  .option("awsAccessKeyId", "YOUR_ACCESS_KEY")  // AWS Access Key
  .option("awsSecretKey", "YOUR_SECRET_KEY")  // AWS Secret Key
  .outputMode("append")  // Kinesis도 append 모드만 지원
  .start()

pub/sub

val query = df.writeStream
  .format("pubsub")
  .option("projectId", "my-gcp-project")  // GCP 프로젝트 ID
  .option("topic", "projects/my-gcp-project/topics/my-topic")  // Pub/Sub 토픽
  .option("checkpointLocation", "/tmp/pubsub-checkpoint")  // 체크포인트 경로 (필수)
  .outputMode("append")  // append 모드만 지원
  .start()

 

* 모두 outputMode append만 지원함

 


2. option

각 format별로 사용할 수 있는 option이 다름

1) readStream

socket(TCP 소켓)

val df = spark.readStream
  .format("socket")
  .option("host", "localhost") // 소켓 서버의 호스트
  .option("port", 9999) // 소켓 서버의 포트
  .load()

 

kafka(Kafka Consumer)

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") // Kafka 브로커 리스트
  .option("subscribe", "test_topic") // 하나 이상의 토픽을 구독(쉼표로 구분 가능
  .option("startingOffsets", "earliest") // earliest, latest 지정 가능
  .load()

 

file (파일 스트리밍)

val df = spark.readStream
  .format("csv")
  .option("header", "true")
  .schema(schema) // 스키마 정의
  .load("input/")

 

rate(테스트용 데이터 생성)

val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10") // 초당 생성할 행의 개수(기본 1)
  .option("rampUpTime", "1s") // 초당 생성 속도 증가 시간
  .load()

2) writeStream

마찬가지로 각 format별로 사용할 수 있는 option이 다름

 

console(콘솔 출력)

val query = df.writeStream
  .format("console")
  .option("truncate", "false") // 긴 문자열을 생략할지 여부 (true / false)
  .outputMode("append")
  .start()

 

kafka (Kafka Producer)

val query = df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") // Kafka 브로커 주소
  .option("topic", "output_topic") // 전송할 Kafka 토픽
  .option("checkpointLocation", "/tmp/kafka-checkpoint") // 체크포인트 디렉토리 지정
  .start()

 

file (파일 저장)

val query = df.writeStream
  .format("csv")
  .option("path", "output/") // 저장할 디렉토리 경로
  .option("checkpointLocation", "/tmp/checkpoint") // 체크포인트 경로 지정
  .outputMode("append")
  .start()

 

memory (메모리 테이블 저장)

val query = df.writeStream
  .format("memory")
  .queryName("table_name") // 메모리 테이블 이름
  .outputMode("append")
  .start()

 

checkpointLocation(체크포인트, 장애복구)

.option("checkpointLocation", "/tmp/checkpoints")

3. outputMode()

모드 설명
append 새로 추가된 데이터만 출력
complete 전체 데이터 다시 출력
update 변경된 데이터만 출력

4. queryName()

 

  • queryName()은 Spark Streaming에서 실행 중인 쿼리에 이름을 지정하는 기능
  • 이름을 지정하면 StreamingQueryManager에서 쿼리를 쉽게 추적하고 관리할 수 있음
  • queryName()을 사용하면 spark.streams.active 목록에서 해당 쿼리를 찾을 수 있음
val query = df.writeStream
  .format("console")
  .queryName("my_streaming_query")  // 쿼리 이름 설정
  .outputMode("append")
  .start()

 

// 실행 중인 모든 Streaming Query 목록 확인
spark.streams.active.foreach(q => println(q.name))
// 특정 쿼리 정지
spark.streams.active.filter(_.name == "my_streaming_query").foreach(_.stop())

5. awaitTermination()

 

  • awaitTermination()은 Spark Streaming 애플리케이션이 종료되지 않고 계속 실행되도록 유지하는 함수
  • 기본적으로 Spark Streaming은 백그라운드에서 실행되므로, main 스레드가 종료되지 않도록 awaitTermination()을 호출해야 함
  • 특정 시간 동안만 대기할 수도 있음 (awaitTermination(timeoutMillis))
val query = df.writeStream
  .format("console")
  .outputMode("append")
  .start()

query.awaitTermination()  // 프로그램이 종료되지 않고 계속 실행됨

 

 

특정 시간 동안만 실행

query.awaitTermination(60000)  // 60초 동안 실행 후 종료

 

728x90

댓글