728x90
이번 포스트에는 spark streaming에서 input과 output에서 사용되는 source(format)의 종류와 option을 정리해보겠습니다.
1. format
1) readStream.format()
Format | 설명 |
socket | TCP 소켓 (netcat 등)에서 텍스트 데이터 수신 |
kafka | Kafka 토픽에서 데이터 수신 |
file | 특정 디렉토리에 추가되는 파일을 실시간으로 읽음 |
rate | 초당 일정한 개수의 행을 생성하는 데이터 소스 |
rate-micro-batch | 마이크로 배치 기반으로 일정한 개수의 행을 생성 |
delta | Delta Lake 테이블을 스트리밍 소스로 사용 |
parquet | Parquet 파일을 스트리밍 소스로 사용 |
orc | ORC 파일을 스트리밍 소스로 사용 |
json | JSON 파일을 스트리밍 소스로 사용 |
csv | CSV 파일을 스트리밍 소스로 사용 |
avro | Avro 파일을 스트리밍 소스로 사용 |
kinesis | AWS kinesis로부터 데이터 수신 |
pub/sub | GCP pub/sub으로부터 데이터 수신 |
... | ... |
kafka
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("checkpointLocation", "/to/HDFS-compatible/dir")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
kinesis
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("KinesisStreamExample")
.getOrCreate()
val df = spark.readStream
.format("kinesis")
.option("streamName", "my-kinesis-stream") // Kinesis 스트림 이름
.option("region", "us-east-1") // AWS 리전 설정
.option("initialPosition", "latest") // 최신 데이터부터 읽기
.load()
df.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
pub/sub
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("PubSubStreamExample")
.getOrCreate()
val df = spark.readStream
.format("pubsub")
.option("projectId", "my-gcp-project") // GCP 프로젝트 ID
.option("subscription", "my-subscription") // Pub/Sub 구독 ID
.load()
df.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
2) writeStream.format()
Format | 설명 |
console | 콘솔에 결과 출력 |
memory | 출력이 메모리에 in-memory 테이블 형태로 저장 |
kafka | Kafka 토픽으로 데이터 전송 |
parquet | Parquet 파일로 저장 |
orc | ORC 파일로 저장 |
json | JSON 파일로 저장 |
csv | CSV 파일로 저장 |
avro | Avro 파일로 저장 |
delta | Delta Lake에 저장 |
foreach | 사용자 정의 Sink 사용 |
kinesis | kinesis에 데이터 전송 |
pub/sub | pub/sub에 데이터 전송 |
kafka
val query = df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") // Kafka 브로커 주소
.option("topic", "my-topic") // 보낼 Kafka 토픽
.option("checkpointLocation", "/tmp/kafka-checkpoint") // 체크포인트 경로 (필수)
.outputMode("append") // Kafka는 append 모드만 지원
.start()
kinesis
val query = df.writeStream
.format("kinesis")
.option("streamName", "my-kinesis-stream") // Kinesis 스트림 이름
.option("region", "us-east-1") // AWS 리전 설정
.option("checkpointLocation", "/tmp/kinesis-checkpoint") // 체크포인트 경로 (필수)
.option("awsAccessKeyId", "YOUR_ACCESS_KEY") // AWS Access Key
.option("awsSecretKey", "YOUR_SECRET_KEY") // AWS Secret Key
.outputMode("append") // Kinesis도 append 모드만 지원
.start()
pub/sub
val query = df.writeStream
.format("pubsub")
.option("projectId", "my-gcp-project") // GCP 프로젝트 ID
.option("topic", "projects/my-gcp-project/topics/my-topic") // Pub/Sub 토픽
.option("checkpointLocation", "/tmp/pubsub-checkpoint") // 체크포인트 경로 (필수)
.outputMode("append") // append 모드만 지원
.start()
* 모두 outputMode append만 지원함
2. option
각 format별로 사용할 수 있는 option이 다름
1) readStream
socket(TCP 소켓)
val df = spark.readStream
.format("socket")
.option("host", "localhost") // 소켓 서버의 호스트
.option("port", 9999) // 소켓 서버의 포트
.load()
kafka(Kafka Consumer)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") // Kafka 브로커 리스트
.option("subscribe", "test_topic") // 하나 이상의 토픽을 구독(쉼표로 구분 가능
.option("startingOffsets", "earliest") // earliest, latest 지정 가능
.load()
file (파일 스트리밍)
val df = spark.readStream
.format("csv")
.option("header", "true")
.schema(schema) // 스키마 정의
.load("input/")
rate(테스트용 데이터 생성)
val df = spark.readStream
.format("rate")
.option("rowsPerSecond", "10") // 초당 생성할 행의 개수(기본 1)
.option("rampUpTime", "1s") // 초당 생성 속도 증가 시간
.load()
2) writeStream
마찬가지로 각 format별로 사용할 수 있는 option이 다름
console(콘솔 출력)
val query = df.writeStream
.format("console")
.option("truncate", "false") // 긴 문자열을 생략할지 여부 (true / false)
.outputMode("append")
.start()
kafka (Kafka Producer)
val query = df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") // Kafka 브로커 주소
.option("topic", "output_topic") // 전송할 Kafka 토픽
.option("checkpointLocation", "/tmp/kafka-checkpoint") // 체크포인트 디렉토리 지정
.start()
file (파일 저장)
val query = df.writeStream
.format("csv")
.option("path", "output/") // 저장할 디렉토리 경로
.option("checkpointLocation", "/tmp/checkpoint") // 체크포인트 경로 지정
.outputMode("append")
.start()
memory (메모리 테이블 저장)
val query = df.writeStream
.format("memory")
.queryName("table_name") // 메모리 테이블 이름
.outputMode("append")
.start()
checkpointLocation(체크포인트, 장애복구)
.option("checkpointLocation", "/tmp/checkpoints")
3. outputMode()
모드 | 설명 |
append | 새로 추가된 데이터만 출력 |
complete | 전체 데이터 다시 출력 |
update | 변경된 데이터만 출력 |
4. queryName()
- queryName()은 Spark Streaming에서 실행 중인 쿼리에 이름을 지정하는 기능
- 이름을 지정하면 StreamingQueryManager에서 쿼리를 쉽게 추적하고 관리할 수 있음
- queryName()을 사용하면 spark.streams.active 목록에서 해당 쿼리를 찾을 수 있음
val query = df.writeStream
.format("console")
.queryName("my_streaming_query") // 쿼리 이름 설정
.outputMode("append")
.start()
// 실행 중인 모든 Streaming Query 목록 확인
spark.streams.active.foreach(q => println(q.name))
// 특정 쿼리 정지
spark.streams.active.filter(_.name == "my_streaming_query").foreach(_.stop())
5. awaitTermination()
- awaitTermination()은 Spark Streaming 애플리케이션이 종료되지 않고 계속 실행되도록 유지하는 함수
- 기본적으로 Spark Streaming은 백그라운드에서 실행되므로, main 스레드가 종료되지 않도록 awaitTermination()을 호출해야 함
- 특정 시간 동안만 대기할 수도 있음 (awaitTermination(timeoutMillis))
val query = df.writeStream
.format("console")
.outputMode("append")
.start()
query.awaitTermination() // 프로그램이 종료되지 않고 계속 실행됨
특정 시간 동안만 실행
query.awaitTermination(60000) // 60초 동안 실행 후 종료
728x90
'BigData > Spark & Spark Tuning' 카테고리의 다른 글
[Spark] Spark Streaming 운영 환경에서의 Structured Streaming (0) | 2025.02.10 |
---|---|
[Spark] Spark Streaming 이벤트 시간과 상태 기반 처리 정리 (0) | 2025.02.10 |
[Spark] Spark Streaming, Structured Streaming 기초 정리 (0) | 2025.02.06 |
[Spark] EMR을 구성하는 instance는 큰 spec이 유리할까? 작은 spec이 유리할까? (0) | 2025.01.14 |
[Spark] 프로그래밍 언어 별 Spark 성능, 속도 차이 (UDF와 직렬화) (0) | 2025.01.10 |
댓글