BigData/Spark & Spark Tuning
[Spark] PySpark read & write + partitioning 간단한 예시문제
스파이디웹
2022. 8. 23. 23:58
728x90
python read&write 과 partition에 대해
아래의 링크에서 받은 파일로 다음과 같은 문제를 만들었습니다.
https://github.com/udacity/data-analyst/blob/master/projects/bike_sharing/201508_trip_data.csv
혹은
아래의 링크에서 201508_trip_data.csv 파일 다운로드 후 Spark에서 Data Read
https://github.com/Spidyweb-3588/python_skillup/tree/main/201508data
문법적인 자유도에 관한 문제를 보시려면 아래의 링크를 참조해주세요
2022.03.26 - [BigData/Spark & Spark Tuning] - [Spark] 스파크의 문법적 자유도, 스키마 조작, dummy 생성
PySpark Read & Write 문제를 풀기 위해 필요한 코드
- Spark session생성
- 필요한 라이브러리 호출
- csvfile read
import findspark#필요한 라이브러리
findspark.init()
from pyspark.sql import SparkSession#필요한 라이브러리
from pyspark.sql import functions as F#필요한 라이브러리
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()#sparksession local로 생성
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)#JupyterNotebook(lab)에서만 적용되는 config
#각자 csv파일 로케이션에 따라 file을 read한다.
df = spark.read.format("csv").option("header","true").option("inferschema","true").load("~/location/file.csv")
read & write with partition 조작 문제
1. 데이터를 읽어서 parquet 형식 파일로 저장
df.write.format("parquet")\
.mode("overwrite")\
.save("~/transform to parquet")
2. Start Station 별로 partition 하여 저장
- partition이 된다는 것은 partition으로 지정된 column의 unique한 값으로 sub directory가 구분이 되고 그 안에 파일이 써진다는 것
df.write.format("parquet")\
.mode("overwrite")\
.partitionBy("Start_Station")\
.save("~/parquet with partitions")
3. (non-partition으로) 데이터 셋을 1개의 parquet 파일로 저장
#output partition 조작
df.coalesce(1).write.format("parquet")\
.mode("overwrite")\
.save("~/one parquet file")
4. (non-partition으로) 데이터 셋을 5개의 파일로 나누어 저장
#output partition 조작
df.coalesce(5).write.format("parquet")\
.mode("overwrite")\
.save("~/five parquet file")
5. csv로 적재된 원본 데이터 셋을 partition당 사이즈가 10MB를 넘지 않도록 나누어 data frame에 적재
#input partition 조작
spark.conf.set("spark.sql.files.maxPartitionBytes",1000000)
df.write.format("csv")\
.mode("overwrite")\
.save("~/under 10MB csv file")
6. (non-partition으로) 데이터 셋을 파일당 10만개 Row를 보관하도록 저장
df.repartition(1).write.format("csv")\
.option("header","true").option("maxRecordsPerFile",100000)\
.mode("overwrite")\
.save("~/under 10M rows csv file")
7-1. "Start Date"컬럼을 기준으로 partition 형태로 저장
- 년도-월-일 별로 patition으로 저장(3개 partition key)
- 단, 저장 시에 원본의 컬럼은 모두 유지되어야 함
df.withColumn("Start_Date",F.from_unixtime(F.unix_timestamp("Start_Date","MM/dd/yyyy HH:mm")))\
.withColumn("Year",F.substring("Start_Date",1,4))\
.withColumn("Month",F.substring("Start_Date",6,2))\
.withColumn("Day",F.substring("Start_Date",9,2))\
.write.format("parquet")\
.mode("overwrite")\
.partitionBy("Year","Month","Day")\
.save("~/3 Partitions")
7-2. partition으로 저장한 파일을 read
- 년도, 월, 일 조건 입력 시 partition pruning이 발생하는지 확인
- 저장한 파일을 df로 적재하여, 특정 조건만 추출하는 spark program 테스트를 진행
- partition prunning의 발생 여부 확인은 어떤 방법을 통해 알 수 있는지
spark.read.format("parquet").load("~/3 Partitions")\
.where((F.col("Year")==2014) & (F.col("Subscriber_Type")=="Customer")).explain("true")
"""
아래의 Pysical Plan을 살펴보면
PartitionFilters는 스캔 할 파티션을 명시하고 있고,
PushedFilters는 스캔하기 전 필터가 되는 컬럼과 값을 명시하고 있다.
"""
728x90