본문 바로가기
BigData/Spark & Spark Tuning

[Spark Tuning] Spark의 Partition 개념, spark.sql.shuffle.partitions, coalesce() vs repartition(), partitionBy() 정리

by 스파이디웹 2021. 11. 23.
728x90

이번 포스트에는 Spark의 Partition 개념, 종류 및 조작관련 팁, 헷갈릴 만한 개념에 대해 포스트 해보겠습니다.


Partition이란?

  • Partition은 RDDs나 Dataset를 구성하고 있는 최소 단위 객체이며, 스파크의 성능과 리소스 점유량을 크게 좌우할 수 있는 RDD의 가장 기본적인 개념입니다.
  • 데이터 파티셔닝은 데이터를 청크 단위로 여러 클러스터 노드로 분할하는 메커니즘을 의미합니다.
  • 클러스터의 물리적 머신에 존재하는 로우의 집합
  • 각 Partition은 서로 다른 노드에서 분산 처리됩니다.즉1 Core = 1 Task = 1 Partition입니다. 
  • Spark에서는 하나의 최소 연산을 Task라고 표현하는데, 이 하나의 Task에서 하나의 Partition이 처리됩니다. 또한, 하나의 Task는 하나의 Core가 연산 처리합니다.
  • 이처럼 설정된 Partition 수에 따라 각 Partition의 크기가 결정됩니다. 그리고 이 Partition의 크기가 결국 Core 당 필요한 메모리 크기를 결정하게 됩니다.
    • Partition 수 → Core 수
    • Partition 크기 → 메모리 크기
    따라서, Partition의 크기와 수가 Spark 성능에 큰 영향을 미치는데, 통상적으로는 Partition의 크기가 클수록 메모리가 더 필요하고, Partition의 수가 많을수록 Core가 더 필요합니다.
    • 적은 수의 Partition = 크기가 큰 Partition
    • 많은 수의 Partition = 크기가 작은 Partition
    즉, Partition의 수를 늘리는 것은 Task 당 필요한 메모리를 줄이고 병렬화의 정도를 늘립니다.

Partition의 종류
동일한 Partition이지만 쓰이는 때에 따라 분류를 합니다.

  • Input Partition: 데이터를 Spark 작업에 읽어들일 때 생성되는 파티션
  • Output Partition: 작업의 결과를 저장하는 파티션
  • Shuffle Partition: 데이터를 reshuffling할 때 생성되는 파티션

1) input partition spark.conf.set("spark.sql.files.maxPartitionBytes",134217728 )
2) output partition
coalesce()와 repartition()처럼 write시에 partition을 조정하는 옵션
3) shuffle partition
spark.conf.set("spark.sql.shuffle.partitions", 1800) 처럼 셔플에 사용되는 파티션 수를 설정하는 옵션,

 


spark.conf.set("spark.sql.files.maxPartitionBytes",bytes)

  • 처음 파일을 읽을 때 생성하는 파티션
  • 기본값은 134217728(128MB)
  • 파일(HDFS상의 마지막 경로에 존재하는 파일)의 크기가 128MB보다 크다면 Spark에서 128MB만큼 쪼개면서 파일을 읽는다.
  • 파일의 크기가 128MB보다 작다면 그대로 읽어 들여, 파일 하나당 Partition하나가 된다.
  • file-based sources인 JSON,parquet,ORC 같은 것에만 적용된다.

spark.conf.set("spark.sql.shuffle.partitions",num of partition)

  • spark.sql.shuffle.partitions 옵션은 join,groupBy 혹은 aggregation 같은 연산을 할 시 data shuffling되는 파티션 수를 나타냅니다.
  • 파티션의 개수는 default로는 200으로 지정 되어 있으며, spark.conf.set("spark.sql.shuffle.partitions", 1800)와 같은 형태로 조정이 가능합니다.
  • Core 수에 맞게 설정하라고 하기보단, Partition의 크기에 맞추어서 설정
  •  Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어남 -> Task가 지연되고 에러가 발생 -> 하둡클러스터에 에러가 나고 spark 강제종료
  • Memory Limit Over와 같이, Shuffle Spill도 메모리 부족으로 나타나는데, 보통 이에 대한 대응을 Core 당 메모리를 늘리는 것으로 해결
  • 일반적으로, 하나의 Shuffle Partition 크기가 100~200MB정도 나올 수 있도록 수를 조절하는 것이 최적
  • 클러스터의 익스큐터 수보다 파티션의 수를 더 크게 지정하는 것이 대체로 좋다. executor num > partition num
  • 로컬 머신에서 처리할 경우 병렬로 처리할 수 있는 task가 제한적이므로 이 값을 작게 설정

coalesce(), repartition()은 언제 사용하는지?

  • filter()와 같은 transformation 연산을 수행하다 보면 최초에 설정된 파티션 개수가 적합하지 않은 경우가 있는데, 이럴 때 파티션 수를 조정하는 option을 사용합니다.
  • 파일을 저장할 때 생성하는 Partition(Output Partition), 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정
  • 보통 groupBy 집계 후 저장할 때 데이터의 크기가 작아짐 -> spark.sql.shuffle.partitions 설정에 따라 파일 수가 지정되는데, 이때 파일의 크기를 늘리기 위해 repartition와 coalesce을 사용해 Partition 수를 줄임으로서 파일의 크기를 늘림
  • df.where()를 통해 필터링을 하고 나서 그대로 저장한다면 파편화가 생김 -> repartition(cnt) (셔플)을 한 후 저장

coalesce() vs repartition()

  • 이 2개의 가장 큰 차이는 셔플을 하냐 안하냐의 차이입니다(default로 봤을 때)
  • coalesce는 파티션을 줄일 때 사용하고, repartition()은 파티션 수를 늘리거나 줄일 때 사용합니다.
  • coalesce(numofpartition,true) 처럼 true 옵션을 줬을 때에는 강제로 셔플을 할 수 있습니다. -> true옵션을 주게 되면 파티션을 늘릴 수도 있습니다.
  • default값으로 사용하려면 파티션 수를 줄일 땐 coalesce(), 파티션 수를 늘릴 땐 repartition()사용

coalesce()메소드와 repartition()메소드 내부 코드

repartition내부는 coalesce메소드를 호출하는 형태로 되어있습니다.

coalesce()또한 shuffle을 주는 옵션이 포함되어 있는 것을 확인할 수 있습니다.


spark.conf.set("spark.sql.files.maxRecordsPerFile",num of records)

  • spark write시에 파일의 record수를 지정하여 해당 record수 별로 파일을 생성 할 수 있는 config
  • write시에 .option("maxRecordsPerFile",100000) 으로도 지정 가능

ex 1)
spark.conf.set("spark.sql.files.maxRecordsPerFile",1000000)
 
ex 2)
df.write.format("csv")\
   .option("header","true").option("inferschema","true").option("maxRecordsPerFile",1000000)\
   .save("path")
 
*csv, parquet format둘 다 적용된다.


df partition수 확인하기

df.rdd.getNumPartitions()

(헷갈릴 만한 개념)df write시에 repartition(),coalesce() vs partitionBy()

  • Pyspark repartition(),coalesce()는 Dataframe method로 disk에 쓸 때 메모리에 있는 파티션의 수를 늘리고 줄이는데 사용, 모든 부분의 파일을 하나의 디렉토리에 담겨서 생성
  • hash-based partitioner
df.coalesce(110).write.format("parquet").mode("overwrite").save("path")
df.repartition(110).write.format("parquet").mode("overwrite").save("path")


위와 같은 순서로 df바로 뒤에 사용

# 디렉토리 및 파일 구조
/path/to/output/
    part-00000.parquet
    part-00001.parquet
    part-00002.parquet
    ...
    part-00009.parquet

예시(파티션 숫자대로 파일 갯수가 나옴)


  • Pyspark partitionBy()는 DataframeWriter class의 method로 disk에 Dataframe을 Partition Column의 유니크한 값으로 sub-directory로써 파티셔닝 할 때 사용
    • partitionBy() 호출 → 셔플링(Shuffle) → 파티션별로 디렉토리 생성 → 파일 쓰기 (디스크에 저장)
    • 컬럼 값에 따라 디렉토리가 구분되고, 파티션에 맞는 파일들이 디스크에 기록
  • 파티셔닝 컬럼 값은 중간 메모리 단계에서 처리되고, 그 값에 따라 디렉토리가 생성되는 방식으로 동작
    • partitionBy("columnname") 에서 columnname에 해당하는 컬럼의 값이 중간 메모리 단계에서 처리되고, 셔플링 되고 디렉토리 및 파일을 생성하는 구조
df.write.partitionBy("columnname").format("parquet").mode("overwrite").save("path")
df.write.format("parquet").partitionBy("columnname").mode("overwrite").save("path")
df.write.format("parquet").mode("overwrite").partitionBy("columnname").save("path")


위와같이 write뒤에 partitionBy()메소드를 사용

# 디렉토리 구조 및 파일

/path/to/output/
    region=us/
        part-0000.parquet
        part-0001.parquet
    region=eu/
        part-0000.parquet
    region=asia/
        part-0000.parquet


Spark partition 수 조정에 대해서 보시려면 아래의 링크를 참조해 주세요.
https://spidyweb.tistory.com/335
 
참조:
https://tech.kakao.com/2021/10/08/spark-shuffle-partition/
https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/
 
수정 일시:
2022-04-07

2025-01-14

728x90

댓글