[Spark Tuning] Spark의 Partition 개념, spark.sql.shuffle.partitions, coalesce() vs
이번 포스트에는 Spark의 Partition 개념, 종류 및 조작관련 팁, 헷갈릴 만한 개념에 대해 포스트 해보겠습니다.
Partition이란?
- Partition은 RDDs나 Dataset를 구성하고 있는 최소 단위 객체이며, 스파크의 성능과 리소스 점유량을 크게 좌우할 수 있는 RDD의 가장 기본적인 개념입니다.
- 데이터 파티셔닝은 데이터를 여러 클러스터 노드로 분할하는 메커니즘을 의미합니다.
- 각 Partition은 서로 다른 노드에서 분산 처리됩니다.즉, 1 Core = 1 Task = 1 Partition입니다.
- Spark에서는 하나의 최소 연산을 Task라고 표현하는데, 이 하나의 Task에서 하나의 Partition이 처리됩니다. 또한, 하나의 Task는 하나의 Core가 연산 처리합니다.
- 이처럼 설정된 Partition 수에 따라 각 Partition의 크기가 결정됩니다. 그리고 이 Partition의 크기가 결국 Core 당 필요한 메모리 크기를 결정하게 됩니다.
- Partition 수 → Core 수
- Partition 크기 → 메모리 크기
- 적은 수의 Partition = 크기가 큰 Partition
- 많은 수의 Partition = 크기가 작은 Partition
Partition의 종류
동일한 Partition이지만 쓰이는 때에 따라 분류를 합니다.
- Input Partition
- Output Partition
- Shuffle Partition
1) input partition spark.conf.set("spark.sql.files.maxPartitionBytes",134217728 )
2) output partition
coalesce()와 repartition()처럼 write시에 partition을 조정하는 옵션
3) shuffle partition
spark.conf.set("spark.sql.shuffle.partitions", 1800) 처럼 셔플에 사용되는 파티션 수를 설정하는 옵션,
spark.conf.set("spark.sql.files.maxPartitionBytes",bytes)
- 처음 파일을 읽을 때 생성하는 파티션
- 기본값은 134217728(128MB)
- 파일(HDFS상의 마지막 경로에 존재하는 파일)의 크기가 128MB보다 크다면 Spark에서 128MB만큼 쪼개면서 파일을 읽는다.
- 파일의 크기가 128MB보다 작다면 그대로 읽어 들여, 파일 하나당 Partition하나가 된다.
- file-based sources인 JSON,parquet,ORC 같은 것에만 적용된다.
spark.conf.set("spark.sql.shuffle.partitions",num of partition)
- spark.sql.shuffle.partitions 옵션은 join,groupBy 혹은 aggregation 같은 연산을 할 시 data shuffling되는 파티션 수를 나타냅니다.
- 파티션의 개수는 default로는 200으로 지정 되어 있으며, spark.conf.set("spark.sql.shuffle.partitions", 1800)와 같은 형태로 조정이 가능합니다.
- Core 수에 맞게 설정하라고 하기보단, Partition의 크기에 맞추어서 설정
- Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어남 -> Task가 지연되고 에러가 발생 -> 하둡클러스터에 에러가 나고 spark 강제종료
- Memory Limit Over와 같이, Shuffle Spill도 메모리 부족으로 나타나는데, 보통 이에 대한 대응을 Core 당 메모리를 늘리는 것으로 해결
- 일반적으로, 하나의 Shuffle Partition 크기가 100~200MB정도 나올 수 있도록 수를 조절하는 것이 최적
- 클러스터의 익스큐터 수보다 파티션의 수를 더 크게 지정하는 것이 대체로 좋다. executor num > partition num
- 로컬 머신에서 처리할 경우 병렬로 처리할 수 있는 task가 제한적이므로 이 값을 작게 설정
coalesce(), repartition()은 언제 사용하는지?
- filter()와 같은 transformation 연산을 수행하다 보면 최초에 설정된 파티션 개수가 적합하지 않은 경우가 있는데, 이럴 때 파티션 수를 조정하는 option을 사용합니다.
- 파일을 저장할 때 생성하는 Partition, 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정
- 보통 groupBy 집계 후 저장할 때 데이터의 크기가 작아짐 -> spark.sql.shuffle.partitions 설정에 따라 파일 수가 지정되는데, 이때 파일의 크기를 늘리기 위해 repartition와 coalesce을 사용해 Partition 수를 줄임으로서 파일의 크기를 늘림
- df.where()를 통해 필터링을 하고 나서 그대로 저장한다면 파편화가 생김 -> repartition(cnt) (셔플)을 한 후 저장
coalesce() vs repartition()
- 이 2개의 가장 큰 차이는 셔플을 하냐 안하냐의 차이입니다(default로 봤을 때)
- coalesce는 파티션을 줄일 때 사용하고, repartition()은 파티션 수를 늘리거나 줄일 때 사용합니다.
- coalesce(numofpartition,true) 처럼 true 옵션을 줬을 때에는 강제로 셔플을 할 수 있습니다. -> true옵션을 주게 되면 파티션을 늘릴 수도 있습니다.
- default값으로 사용하려면 파티션 수를 줄일 땐 coalesce(), 파티션 수를 늘릴 땐 repartition()사용
coalesce()메소드와 repartition()메소드 내부 코드
repartition내부는 coalesce메소드를 호출하는 형태로 되어있습니다.
coalesce()또한 shuffle을 주는 옵션이 포함되어 있는 것을 확인할 수 있습니다.
spark.conf.set("spark.sql.files.maxRecordsPerFile",num of records)
- spark write시에 파일의 record수를 지정하여 해당 record수 별로 파일을 생성 할 수 있는 config
- write시에 .option("maxRecordsPerFile",100000) 으로도 지정 가능
ex 1)
spark.conf.set("spark.sql.files.maxRecordsPerFile",1000000)
ex 2)
df.write.format("csv")\
.option("header","true").option("inferschema","true").option("maxRecordsPerFile",1000000)\
.save("path")
*csv, parquet format둘 다 적용된다.
df partition수 확인하기
df.rdd.getNumPartitions()
(헷갈릴 만한 개념)df write시에 repartition(),coalesce() vs partitionBy()
- Pyspark repartition(),coalesce()는 Dataframe method로 disk에 쓸 때 메모리에 있는 파티션의 수를 늘리고 줄이는데 사용, 모든 부분의 파일을 하나의 디렉토리에 담겨서 생성
df.coalesce(10).write.format("parquet").mode("overwrite").save("path")
df.repartition(10).write.format("parquet").mode("overwrite").save("path")
위와 같은 순서로 df바로 뒤에 사용
- Pyspark partitionBy()는 DataframeWriter class의 method로 disk에 Dataframe을 Partition Column의 유니크한 값으로 sub-directory로써 파티셔닝 할 때 사용
df.write.partitionBy("columnname").format("parquet").mode("overwrite").save("path")
df.write.format("parquet").partitionBy("columnname").mode("overwrite").save("path")
df.write.format("parquet").mode("overwrite").partitionBy("columnname").save("path")
위와같이 write뒤에 partitionBy()메소드를 사용
Spark partition 수 조정에 대해서 보시려면 아래의 링크를 참조해 주세요.
https://spidyweb.tistory.com/335
참조:
https://tech.kakao.com/2021/10/08/spark-shuffle-partition/
https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/
수정 일시:
2022-04-07