BigData/Spark & Spark Tuning

[Spark] PySpark read & write + partitioning 간단한 예시문제

스파이디웹 2022. 8. 23. 23:58
728x90

python read&write 과 partition에 대해

아래의 링크에서 받은 파일로 다음과 같은 문제를 만들었습니다.

 

https://github.com/udacity/data-analyst/blob/master/projects/bike_sharing/201508_trip_data.csv

혹은

아래의 링크에서 201508_trip_data.csv 파일 다운로드 후 Spark에서 Data Read

https://github.com/Spidyweb-3588/python_skillup/tree/main/201508data

 

 

문법적인 자유도에 관한 문제를 보시려면 아래의 링크를 참조해주세요

2022.03.26 - [BigData/Spark & Spark Tuning] - [Spark] 스파크의 문법적 자유도, 스키마 조작, dummy 생성


PySpark Read & Write 문제를 풀기 위해 필요한 코드

  • Spark session생성
  • 필요한 라이브러리 호출
  • csvfile read
import findspark#필요한 라이브러리
findspark.init()

from pyspark.sql import SparkSession#필요한 라이브러리
from pyspark.sql import functions as F#필요한 라이브러리

spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()#sparksession local로 생성
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)#JupyterNotebook(lab)에서만 적용되는 config

#각자 csv파일 로케이션에 따라 file을 read한다.
df = spark.read.format("csv").option("header","true").option("inferschema","true").load("~/location/file.csv")

read & write with partition 조작 문제

1. 데이터를 읽어서 parquet 형식 파일로 저장

df.write.format("parquet")\
        .mode("overwrite")\
        .save("~/transform to parquet")

2. Start Station 별로 partition 하여 저장

  • partition이 된다는 것은 partition으로 지정된 column의 unique한 값으로 sub directory가 구분이 되고 그 안에 파일이 써진다는 것
df.write.format("parquet")\
  .mode("overwrite")\
  .partitionBy("Start_Station")\
  .save("~/parquet with partitions")

3. (non-partition으로) 데이터 셋을 1개의 parquet 파일로 저장

#output partition 조작
df.coalesce(1).write.format("parquet")\
  .mode("overwrite")\
  .save("~/one parquet file")

4. (non-partition으로) 데이터 셋을 5개의 파일로 나누어 저장

#output partition 조작
df.coalesce(5).write.format("parquet")\
  .mode("overwrite")\
  .save("~/five parquet file")

5. csv로 적재된 원본 데이터 셋을 partition당 사이즈가 10MB를 넘지 않도록 나누어 data frame에 적재

#input partition 조작
spark.conf.set("spark.sql.files.maxPartitionBytes",1000000)

df.write.format("csv")\
  .mode("overwrite")\
  .save("~/under 10MB csv file")

6. (non-partition으로) 데이터 셋을 파일당 10만개 Row를 보관하도록 저장

df.repartition(1).write.format("csv")\
  .option("header","true").option("maxRecordsPerFile",100000)\
  .mode("overwrite")\
  .save("~/under 10M rows csv file")

7-1. "Start Date"컬럼을 기준으로 partition 형태로 저장

  • 년도-월-일 별로 patition으로 저장(3개 partition key)
  • 단, 저장 시에 원본의 컬럼은 모두 유지되어야 함
 
df.withColumn("Start_Date",F.from_unixtime(F.unix_timestamp("Start_Date","MM/dd/yyyy HH:mm")))\
  .withColumn("Year",F.substring("Start_Date",1,4))\
  .withColumn("Month",F.substring("Start_Date",6,2))\
  .withColumn("Day",F.substring("Start_Date",9,2))\
  .write.format("parquet")\
  .mode("overwrite")\
  .partitionBy("Year","Month","Day")\
  .save("~/3 Partitions")​

7-2. partition으로 저장한 파일을 read

  • 년도, 월, 일 조건 입력 시 partition pruning이 발생하는지 확인
  • 저장한 파일을 df로 적재하여, 특정 조건만 추출하는 spark program 테스트를 진행
  • partition prunning의 발생 여부 확인은 어떤 방법을 통해 알 수 있는지
spark.read.format("parquet").load("~/3 Partitions")\
     .where((F.col("Year")==2014) & (F.col("Subscriber_Type")=="Customer")).explain("true")
     
"""
아래의 Pysical Plan을 살펴보면 
PartitionFilters는 스캔 할 파티션을 명시하고 있고,
PushedFilters는 스캔하기 전 필터가 되는 컬럼과 값을 명시하고 있다.
"""

 

728x90