BigData/Spark & Spark Tuning

[Spark Tuning] CSV vs Parquet(columnar format) in spark 성능비교하기

스파이디웹 2023. 1. 25. 17:03
728x90

[File 크기]

컬럼 수: 21개의 컬럼

레코드 수: 총 3,647,595 rows(records)

 

csv: 578MB

parquet: 44.7MB

(gz.parquet: 34.6MB)

 

[비교 관점]

spark에서의 성능이란 file을 스캔할 때 스캔한 양(읽어들인 양)과 스캔시간이 중요

 

[CSV vs Parquet 특징 비교] (json은 덤ㅎㅎ)


1. CSV

csv는 일반적인 text, 즉 row단위로 읽는 file format

1) 1개의 column select

df_csv.select(F.col("Exam_No")).show(100000)

Physical plan

== Physical Plan ==
CollectLimit (3)
+- * Project (2)
   +- Scan csv  (1)


(1) Scan csv 
Output [1]: [Exam_No#16]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/유저이름/Downloads/origin_csv]
ReadSchema: struct<Exam_No:int>

(2) Project [codegen id : 1]
Output [1]: [cast(Exam_No#16 as string) AS Exam_No#102]
Input [1]: [Exam_No#16]

(3) CollectLimit
Input [1]: [Exam_No#102]
Arguments: 100001

Stage

* 100,000rows가 아닌 4096row만 select한다면?

지정한 records가 줄면 scan한 데이터(input size)도 적어진다.


2) 전체 컬럼 select

df_csv.select("*").show(100000)

Stage


3) 결론

  • 하나의 컬럼과 전체컬럼의 10만 rows를 읽을 때 스캔된(읽어들인) 양은 15.9MiB로 동일
  • 지정한 row 수가 달라지면 파일을  scan(input size)하는 데이터 양이 달라진다. (100,000개 15.9MiB, 4096개 712KiB)
  • 하나의 컬럼과 전체컬럼의 10만 rows를 읽을 때 소요된 시간은 읽어들인 데이터의 크기자체가 매우 적기 때문에, 시간 또한 매우 적게 나오게되므로 차이가 나진 않지만, 같은 크기의 데이터를 스캔했으므로, 동일할 것으로 보임(네트워크 등 다른 지연시간을 제외한 순수 작업에 걸린시간)
  • row 기반 컬럼 특징상 지정한 row 수에 따라 스캔에 소요되는 시간이 달라지게 됨

2. Parquet

1) 1개의 컬럼 select

df_parquet.select(F.col("Exam_No")).show(100000)

Physical plan

== Physical Plan ==
CollectLimit (3)
+- * Filter (2)
   +- Scan text  (1)


(1) Scan text 
Output [1]: [value#0]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/유저이름/Downloads/origin_csv/part-00000-0476ec4f-9d04-40c3-8038-b5ad47c08648-c000.csv, ... 7 entries]
ReadSchema: struct<value:string>

(2) Filter [codegen id : 1]
Input [1]: [value#0]
Condition : (length(trim(value#0, None)) > 0)

(3) CollectLimit
Input [1]: [value#0]
Arguments: 1

 

parquet는 csv와 다르게 scan time이 나오게 되는데, 필요한 데이터의 parquet file을 스캔하면서 소요된 시간을 의미

== Physical Plan ==
CollectLimit (4)
+- * Project (3)
   +- * ColumnarToRow (2)
      +- Scan parquet  (1)


(1) Scan parquet 
Output [1]: [Exam_No#58]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/유저이름/Downloads/origin_parquet]
ReadSchema: struct<Exam_No:int>

(2) ColumnarToRow [codegen id : 1]
Input [1]: [Exam_No#58]

(3) Project [codegen id : 1]
Output [1]: [cast(Exam_No#58 as string) AS Exam_No#102]
Input [1]: [Exam_No#58]

(4) CollectLimit
Input [1]: [Exam_No#102]
Arguments: 100001

 

Stage

 

*1batch 단위의 records(4096rows)를 show()한다면?

columnar특성 상 지정한 record 수가 한 개의 물리 파일 내에 존재한다면, 해당 파일의 한 개의 컬럼 만을 스캔 하기 때문에, 레코드 수와 관계없이 동일한 양의 스캔을 함(405.9KiB)

 

*1개가 아닌 2개의 column을 show()한다면?

columnar 특성 상 지정한 column수가 늘어나면 input size도 그에 맞게 증가 함


2) 전체컬럼 select

df_parquet.select("*").show(100000)
"""
df_parquet.select(F.col("Exam_No"),
                  F.col("List_No"),
                  F.col("First_Name"),
                  F.col("MI"),
                  F.col("Last_Name"),
                  F.col("List_Agency_code"),
                  F.col("List_Agency_Desc"),
                  F.col("List_Title_Desc"),
                  F.col("Cert_Seq_No"),
                  F.col("Request_Date"),
                  F.col("Salary"),
                  F.col("Cert_Issue_No"),
                  F.col("Cert_Date"),
                  F.col("Reissue_Date"),
                  F.col("Cert_Expiration_Date"),
                  F.col("No_Certified"),
                  F.col("No_Requested"),
                  F.col("Provisional_Replacement"),
                  F.col("No_Vacancies"),
                  F.col("List_Title_Code"),
                  F.col("Sel_Cert_Description")).show(100000)
일일히 지정하나, select("*")하나 카탈리스트 옵티마이저의 동일한 쿼리플랜에 의해 동일하게 적용 됨
"""

Physical plan

== Physical Plan ==
CollectLimit (3)
+- * Filter (2)
   +- Scan text  (1)


(1) Scan text 
Output [1]: [value#0]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/유저이름/Downloads/origin_csv/part-00000-0476ec4f-9d04-40c3-8038-b5ad47c08648-c000.csv, ... 7 entries]
ReadSchema: struct<value:string>

(2) Filter [codegen id : 1]
Input [1]: [value#0]
Condition : (length(trim(value#0, None)) > 0)

(3) CollectLimit
Input [1]: [value#0]
Arguments: 1

text를 scan한 양은 말그대로 글자의 크기를 스캔한 크기라서 csv,parquet와 무관하게 데이터 크기는 동일

parquet는 csv와 다르게 scan time이 나오게 되는데, 필요한 데이터의 parquet file을 스캔하면서 소요된 시간을 의미

== Physical Plan ==
CollectLimit (4)
+- * Project (3)
   +- * ColumnarToRow (2)
      +- Scan parquet  (1)


(1) Scan parquet 
Output [21]: [Exam_No#58, List_No#59, First_Name#60, MI#61, Last_Name#62, List_Agency_code#63, List_Agency_Desc#64, List_Title_Desc#65, Cert_Seq_No#66, Request_Date#67, Salary#68, Cert_Issue_No#69, Cert_Date#70, Reissue_Date#71, Cert_Expiration_Date#72, No_Certified#73, No_Requested#74, Provisional_Replacement#75, No_Vacancies#76, List_Title_Code#77, Sel_Cert_Description#78]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/유저이름/Downloads/origin_parquet]
ReadSchema: struct<Exam_No:int,List_No:double,First_Name:string,MI:string,Last_Name:string,List_Agency_code:int,List_Agency_Desc:string,List_Title_Desc:string,Cert_Seq_No:double,Request_Date:string,Salary:double,Cert_Issue_No:int,Cert_Date:string,Reissue_Date:string,Cert_Expiration_Date:string,No_Certified:int,No_Requested:int,Provisional_Replacement:string,No_Vacancies:int,List_Title_Code:int,Sel_Cert_Description:string>

(2) ColumnarToRow [codegen id : 1]
Input [21]: [Exam_No#58, List_No#59, First_Name#60, MI#61, Last_Name#62, List_Agency_code#63, List_Agency_Desc#64, List_Title_Desc#65, Cert_Seq_No#66, Request_Date#67, Salary#68, Cert_Issue_No#69, Cert_Date#70, Reissue_Date#71, Cert_Expiration_Date#72, No_Certified#73, No_Requested#74, Provisional_Replacement#75, No_Vacancies#76, List_Title_Code#77, Sel_Cert_Description#78]

(3) Project [codegen id : 1]
Output [21]: [cast(Exam_No#58 as string) AS Exam_No#142, cast(List_No#59 as string) AS List_No#143, First_Name#60, MI#61, Last_Name#62, cast(List_Agency_code#63 as string) AS List_Agency_code#147, List_Agency_Desc#64, List_Title_Desc#65, cast(Cert_Seq_No#66 as string) AS Cert_Seq_No#150, Request_Date#67, cast(Salary#68 as string) AS Salary#152, cast(Cert_Issue_No#69 as string) AS Cert_Issue_No#153, Cert_Date#70, Reissue_Date#71, Cert_Expiration_Date#72, cast(No_Certified#73 as string) AS No_Certified#157, cast(No_Requested#74 as string) AS No_Requested#158, Provisional_Replacement#75, cast(No_Vacancies#76 as string) AS No_Vacancies#160, cast(List_Title_Code#77 as string) AS List_Title_Code#161, Sel_Cert_Description#78]
Input [21]: [Exam_No#58, List_No#59, First_Name#60, MI#61, Last_Name#62, List_Agency_code#63, List_Agency_Desc#64, List_Title_Desc#65, Cert_Seq_No#66, Request_Date#67, Salary#68, Cert_Issue_No#69, Cert_Date#70, Reissue_Date#71, Cert_Expiration_Date#72, No_Certified#73, No_Requested#74, Provisional_Replacement#75, No_Vacancies#76, List_Title_Code#77, Sel_Cert_Description#78]

(4) CollectLimit
Input [21]: [Exam_No#142, List_No#143, First_Name#60, MI#61, Last_Name#62, List_Agency_code#147, List_Agency_Desc#64, List_Title_Desc#65, Cert_Seq_No#150, Request_Date#67, Salary#152, Cert_Issue_No#153, Cert_Date#70, Reissue_Date#71, Cert_Expiration_Date#72, No_Certified#157, No_Requested#158, Provisional_Replacement#75, No_Vacancies#160, List_Title_Code#161, Sel_Cert_Description#78]
Arguments: 100001

 

Stage


3) 결론

  • parquet는 batch단위(여기서는 4096records)로 records를 읽음(columnarReaderBatchSize의 default 값이 4096)
    → 100,000 rows를 요청해도 4096개 단위인 102,400 records를 불러옴
  • columnar특성 상 지정한 record 수가 한 개의 물리 파일 내에 존재한다면, 해당 파일의 한 개의 컬럼 만을 스캔 하기 때문에, 레코드 수와 관계없이 동일한 양의 스캔을 함
    → 파일의 전체 레코드 수를 기준으로 스캔 후 요청한 양 만큼만 보여줌
  • columnar 특성 상 지정한 컬럼 수가 많으면 스캔하는 데이터의 크기도 늘어 남 1개(405.0KiB) 2개(1.3MiB) 19개(7.3MiB)
  • Physical plan에서 1개의 컬럼과 모든 컬럼(21개)의 scan time(읽어들이는 시간)의 차이는 245ms vs 1.4s, columnar 는 지정한 컬럼만 읽어 들임
728x90