BigData/Spark & Spark Tuning

[Spark Tuning] PartitionFilters vs PushedFilter 비교, predicate pushdown vs projection pushdown

스파이디웹 2023. 1. 1. 21:01
728x90

이번 포스트에서는 spark에서 partitionfilter의 개념과 언제 적용되는지, 그리고 확인하는 방법을 확인하고

PushedFilter와의 차이점,마지막으로 PushedFilter의 종류 predicate과 projection의 차이를 알아보겠습니다.


1. PartitionFilters vs PushedFilters

PartitionFilters란

특정 파티션에서만 데이터를 가져오고 관련 없는 파티션은 모두 생략합니다. 데이터 스캔 생략은 성능의 큰 향상을 가져다 줍니다.
PushedFilters 보다 선행된다.
PartitionFilter기술은 조건에 사용된 컬럼이 파티션되어 있고 스파크가 필터 조건을 충족하는 특정 디렉터리만 스캔할 때 사용됩니다.
PushedFilters란
PushedFilters 기술은 각 row 그룹의 최소 및 최대 통계를 먼저 읽고 최소 및 최대 범위가 필터 조건을 충족하는지 여부를 결정하여 메모리에 로드되는 데이터 양을 줄이는 데 도움이 됩니다.

 

parquet 파일에 pushdown이 적용되려면 3.0부터 나온 설정 값으로 sparksession 생성 시 아래의 설정값들은 활성화 되어야 함(기본값은 true로 활성화 되어있음)

"spark.sql.parquet.filterPushdown", "true"
"spark.hadoop.parquet.filter.stats.enabled", "true"
"spark.sql.optimizer.nestedSchemaPruning.enabled","true"
"spark.sql.optimizer.dynamicPartitionPruning.enabled", "true"

1) 파티션(디스크)되지 않은 데이터

데이터 형태


데이터 로드

df_nonpartition = spark.read.format("parquet").load("partition되지 않은 데이터 디렉토리경로")

조건식 없는 실행계획

df_nonpartition.explain()

데이터를 읽어오기만 하면 데이터가 파티션(디스크)된지에 상관없이 실행계획 PartitionFilters 와 PushedFilters에는 아무것도 안나옴


년도 조건의 실행계획

df_nonpartition.where(F.col("YEAR")=="2022").explain()

파티션(디스크)되지 않은 데이터에 "YEAR ==2022" 조건으로 실행계획을 확인하면 PartitionFilters가 아닌 PushedFilters에 설정한 조건이 나옴

df_nonpartition.where(F.col("YEAR")=="2022").show()

읽은 파일 수: 200개(출력 partition 수,disk x)

scan 시간: 590ms

size of files read: 496.5KiB

출력 rows 수:

(parquet는 columnar format으로 batch 단위로 읽게 된다.)

(csv의 경우 .show(숫자) 에 해당하는 row 수 + header만큼의 number of output rows 수)
(6개의 input batch 기준)287개 →(filter-2022) 21개 → (show)20개 출력


년도,시간 조건의 실행계획

df_nonpartition.where((F.col("YEAR")=="2022") & (F.col("CRASH_TIME_HH")=="06:00")).explain()

파티션(디스크)되지 않은 데이터에 "YEAR ==2022"  & "CRASH_TIME_HH == 06:00"조건으로 실행계획을 확인하면 PartitionFilters가 아닌 둘 다 PushedFilters에 설정한 조건이 나옴

 

DataFilters에 년도와 시간 모두 조건으로 나옴

df_nonpartition.where((F.col("YEAR")=="2022") & (F.col("CRASH_TIME_HH")=="06:00")).show()

읽은 파일 수: 200개(출력 partition 수,disk x)

scan 시간: 2.6s

size of files read: 496.5KiB

출력 rows 수:

(parquet는 columnar format으로 batch 단위로 읽게 된다.)

(csv의 경우 .show(숫자) 에 해당하는 row 수 + header만큼의 number of output rows 수)
(57개의 input batch 기준)1073개 →(filter-2022) 5개 → (show)5개 출력


2) 파티션(디스크)된 데이터(YEAR,MONTH)

데이터 형태


데이터 로드

df_partition = spark.read.format("parquet").load("partition된 데이터 디렉토리 경로")

조건식 없는 실행계획

df_partition.explain()

데이터를 읽어오기만 하면 데이터가 파티션(디스크)된지에 상관없이 실행계획 PartitionFilters 와 PushedFilters에는 아무것도 안나옴


년도 조건의 실행계획

df_partition.where(F.col("YEAR")=="2022").explain()

년도,월로 파티션(디스크)된  데이터에 "YEAR ==2022" 조건으로 실행계획을 확인하면 PartitionFilters에 설정한 조건이 나옴

df_partition.where(F.col("YEAR")=="2022").show()

읽은 파일 수: 22개 disk partition (YEAR=2022 폴더 내의 파일 수)

scan 시간: 333ms

size of files read: 42.9KiB

출력 rows 수:

(parquet는 columnar format으로 batch 단위로 읽게 된다.)

(csv의 경우 .show(숫자) 에 해당하는 row 수 + header만큼의 number of output rows 수)
(2개의 input batch 기준)38개 → (show)5개 출력


년도,시간 조건의 실행계획

df_partition.where((F.col("YEAR")=="2022") & (F.col("CRASH_TIME_HH")=="06:00")).explain()

년도,월로 파티션(디스크)된 데이터에 년도,시간 조건의 실행계획을 확인해보면, 년도로 파티션된 데이터는 PartitionsFilters에 조건이 나오게 되고, 파티션되지 않은 시간 데이터는 PushedFilters 조건에 나오게 됨

 

DataFilters에도 시간관련 조건식만 나옴

PartitionFilters의 조건과 PushedFilters의 조건을 같이 사용하면 먼저 PartitionFilters의 조건을 만족하는 디렉토리를 찾을 것이고, 이후에 각 파일 메타데이터를 읽어 PushedFilters의 조건을 만족하는 row그룹에 대해 메모리에 올리게 됨

df_partition.where((F.col("YEAR")=="2022") & (F.col("CRASH_TIME_HH")=="06:00")).show()

읽은 파일 수: 22개 disk partition (YEAR=2022 폴더 내의 파일 수)

scan 시간: 654ms

size of files read: 42.9KiB

출력 rows 수:

(parquet는 columnar format으로 batch 단위로 읽게 된다.)

(csv의 경우 .show(숫자) 에 해당하는 row 수 + header만큼의 number of output rows 수)
(4개의 input batch 기준)73개 →(filter-06:00) 5개 → (show)5개 출력


3) 결론

  • disk partition된 데이터에 filter(where) 조건으로 조회 할 시, 조건에 맞는 디렉토리만 scan하게 되고, 이것 은 곧 스캔되는 데이터의 양과 시간을 줄이게 해줌
  • parquet는 columnar format으로 batch 단위로 데이터를 처리하는데, action에 사용되는 row 수에 따라 batch 수가 정해지며, 읽는 row수도 달라짐(csv는 action에 쓰이는 row 수랑 동일하게 row를 읽음)
  • disk partitioned 된 것은 partition filter로, pushed filter보다 선행되어 데이터를 스캔하게되며, disk partitioned가 안된 데이터는 partition filter보다 후행되어 pushed filter처리 됨

2. Predicate pushdown vs Projection pushdown

1) Predicate pushdown

  • where 혹은 filter 절에서 적용 됨
  • 어떤 column이 필터링될 것 인지가 아닌 어떤 row가 필터링 될 것인지를 나타냄(row-based)
  • PartitionFilters, PushedFilters가 해당됨
  • 'library.books'처럼 중첩된 컬럼에 조건을 걸면 단지 Null이 아닌 값의 레코드를 반환 함
  • partition pruning(PartitionFilters)가 여기에 속함
  • parquet 형식의 파일은 해당 값의 최소값과 최대값을 포함하여 각 열에 대해 몇 가지 다른 통계 메트릭을 유지, 관련 없는 데이터를 건너뛰고 필요한 데이터를 처리

2) Projection pushdown

  • select 절에서 적용 됨
  • 어떤 column이 필터링 될 것인지를 나타냄(column-based)
  • columnar fomat file(parquet)에 적용
  • table scan 프로세스에서 불필요한 필드(column)를 제거하여 file system/database와 spark engine 간의 데이터 전송을 최소화
  • 주로 데이터 집합에 너무 많은 열이 포함되어 있을 때 유용

 

 

 

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-PushDownPredicate.html

 

PushDownPredicate · The Internals of Spark SQL

 

jaceklaskowski.gitbooks.io

 

https://towardsdatascience.com/predicate-vs-projection-pushdown-in-spark-3-ac24c4d11855

728x90