본문 바로가기
BigData/Spark & Spark Tuning

[Spark] 스파크의 문법적 자유도, 스키마 조작, dummy 생성

by 스파이디웹 2022. 3. 26.
728x90

이번 포스트에서 다룰 것

  1. 스파크의 다양한 문법을 소개
  2. 스키마를 정의하여 dataframe 만들기
  3. dummy데이터 생성

아래의 링크에서 201508_trip_data.csv 파일 다운로드 후 Spark에서 Data Read

https://github.com/Spidyweb-3588/python_skillup/tree/main/201508data

 

read&write + 파티션 조작에 관한 간단한 문제예시를 보시려면 아래의 링크를 참조해주세요.

2022.08.23 - [BigData/Spark & Spark Tuning] - [Spark] PySpark read & write + partitioning 간단한 예시문제


1. Start_Station,End_Station 컬럼을 4가지 방법으로 조회

#1.
csvfile.select("Start_Station","End_Station")

#2.
from pyspark.sql import functions as F
csvfile.select(F.col("Start_Station"),F.col("End_Station"))

#3.
csvfile.select(csvfile["Start_Station"],csvfile["End_Station"])

#4.
csvfile.select(csvfile.Start_Station,csvfile.End_Station)

2. Dataframe에 alias(별칭)을 붙여서 Start_Station,End_Station 컬럼을 2가지 방법으로 조회

#1.
csvfile.alias("a").select("a.Start_Station","a.End_Station")

#2.
csvfile.alias("a").select(F.col("a.Start_Station"),F.col("a.End_Station"))

3. selectExpr(), select(expr())를 사용하여 Station,End_Station 컬럼을 조회

#1.selectExpr()사용
csvfile.selectExpr("Start_Station","End_Station")

#2.select(expr())사용
csvfile.select(F.expr("Start_Station"),F.expr("End_Station"))

4. 동일한 사용법의 다양한 메소드

#where, filter
#filter가 구버전 method, where가 신버전 method이다.
csvfile.where(F.col("Zip_Code") > F.lit(2000))
csvfile.filter(F.col("Zip_Code") > F.lit(2000))
#sort, orderBy
#sort가 구버전 method, orderBy가 신버전 method이다.
csvfile.orderBy(F.col("Zip_Code"))
csvfile.sort(F.col("Zip_Code"))

5. Dataframe을 Temp View로 생성

csvfile.createOrReplaceTempView("csvfile")

생성한 table 목록 확인(메타스토어 확인)

spark.sql("show tables")


6. 위에서 생성한 Temp View 데이터 조회

spark.sql("select * from csvfile")


7. Dummy Data 생성

  • ('#','#','0','0','#','현재 날짜')를 dummy data로 입력
  • 3번째 0은 StringType, 4번째 0은 IntegerType, 6번째 값은 현재의 시간을 yyyyMMddHHmmss의 format으로 나타낸 것
#1.Row -> DataFrame 사용
from pyspark.sql import Row
dummy_rows=[Row("#","#","0",0,"#")]
dummy=spark.createDataFrame(dummy_rows).withColumn("lod_dtm",F.from_unixtime(F.unix_timestamp(),'yyyyMMddHHmmss'))

#2.sparkSQL 사용
dummy_sparksql =\
spark.sql("select '#' as col1\
                  ,'#' as col2\
                  ,'0' as col3\
                  ,0 as col4\
                  ,'#' as col5\
                  ,from_unixtime(unix_timestamp(),'yyyyMMddHHmmss') as lod_dtm")


8. 새로운 Schema 생성 후, Data Read시 적용

스파크는 빅데이터의 컨셉인 '일단 데이터를적재하고 나중에(읽을 때) 스키마를 입힌다'를 지원합니다.(하이브 메타 스토어와 같습니다.)

  • Integer type인 컬럼을 Long type으로 변환
  • 컬럼명의 끝에 컬럼의 데이터 타입을 명시( ex. Trip_ID → Trip_ID_Long, Start_Date → Start_Date_String)
  • 기존 스키마 정보

from pyspark.sql.types import StructType, StructField, LongType, StringType
MySchema = StructType([
                           StructField("Trip_ID_Long",LongType(),True),
                           StructField("Duration_Long",LongType(),True),
                           StructField("Start_Date_String",StringType(),True),
                           StructField("Start_Station_String",StringType(),True),
                           StructField("Start_Terminal_Long",LongType(),True),
                           StructField("End_Date_String",StringType(),True),
                           StructField("End_Station_String",StringType(),True),
                           StructField("End_Terminal_Long",LongType(),True),
                           StructField("Bike_#_Long",LongType(),True),
                           StructField("Subscriber_Type_String",StringType(),True),
                           StructField("Zip_Code_String",StringType(),True)
                       ]
                     )
                     
csvfile_MySchema=\
spark.read.format("csv").option("header","true").schema(MySchema).load("~/201508_trip_data.csv")

csvfile_MySchema.printSchema()

728x90

댓글