728x90
이번 포스트에서 다룰 것
- 스파크의 다양한 문법을 소개
- 스키마를 정의하여 dataframe 만들기
- dummy데이터 생성
아래의 링크에서 201508_trip_data.csv 파일 다운로드 후 Spark에서 Data Read
https://github.com/Spidyweb-3588/python_skillup/tree/main/201508data
read&write + 파티션 조작에 관한 간단한 문제예시를 보시려면 아래의 링크를 참조해주세요.
2022.08.23 - [BigData/Spark & Spark Tuning] - [Spark] PySpark read & write + partitioning 간단한 예시문제
1. Start_Station,End_Station 컬럼을 4가지 방법으로 조회
#1.
csvfile.select("Start_Station","End_Station")
#2.
from pyspark.sql import functions as F
csvfile.select(F.col("Start_Station"),F.col("End_Station"))
#3.
csvfile.select(csvfile["Start_Station"],csvfile["End_Station"])
#4.
csvfile.select(csvfile.Start_Station,csvfile.End_Station)
2. Dataframe에 alias(별칭)을 붙여서 Start_Station,End_Station 컬럼을 2가지 방법으로 조회
#1.
csvfile.alias("a").select("a.Start_Station","a.End_Station")
#2.
csvfile.alias("a").select(F.col("a.Start_Station"),F.col("a.End_Station"))
3. selectExpr(), select(expr())를 사용하여 Station,End_Station 컬럼을 조회
#1.selectExpr()사용
csvfile.selectExpr("Start_Station","End_Station")
#2.select(expr())사용
csvfile.select(F.expr("Start_Station"),F.expr("End_Station"))
4. 동일한 사용법의 다양한 메소드
#where, filter
#filter가 구버전 method, where가 신버전 method이다.
csvfile.where(F.col("Zip_Code") > F.lit(2000))
csvfile.filter(F.col("Zip_Code") > F.lit(2000))
#sort, orderBy
#sort가 구버전 method, orderBy가 신버전 method이다.
csvfile.orderBy(F.col("Zip_Code"))
csvfile.sort(F.col("Zip_Code"))
5. Dataframe을 Temp View로 생성
csvfile.createOrReplaceTempView("csvfile")
생성한 table 목록 확인(메타스토어 확인)
spark.sql("show tables")
6. 위에서 생성한 Temp View 데이터 조회
spark.sql("select * from csvfile")
7. Dummy Data 생성
- ('#','#','0','0','#','현재 날짜')를 dummy data로 입력
- 3번째 0은 StringType, 4번째 0은 IntegerType, 6번째 값은 현재의 시간을 yyyyMMddHHmmss의 format으로 나타낸 것
#1.Row -> DataFrame 사용
from pyspark.sql import Row
dummy_rows=[Row("#","#","0",0,"#")]
dummy=spark.createDataFrame(dummy_rows).withColumn("lod_dtm",F.from_unixtime(F.unix_timestamp(),'yyyyMMddHHmmss'))
#2.sparkSQL 사용
dummy_sparksql =\
spark.sql("select '#' as col1\
,'#' as col2\
,'0' as col3\
,0 as col4\
,'#' as col5\
,from_unixtime(unix_timestamp(),'yyyyMMddHHmmss') as lod_dtm")
8. 새로운 Schema 생성 후, Data Read시 적용
스파크는 빅데이터의 컨셉인 '일단 데이터를적재하고 나중에(읽을 때) 스키마를 입힌다'를 지원합니다.(하이브 메타 스토어와 같습니다.)
- Integer type인 컬럼을 Long type으로 변환
- 컬럼명의 끝에 컬럼의 데이터 타입을 명시( ex. Trip_ID → Trip_ID_Long, Start_Date → Start_Date_String)
- 기존 스키마 정보
from pyspark.sql.types import StructType, StructField, LongType, StringType
MySchema = StructType([
StructField("Trip_ID_Long",LongType(),True),
StructField("Duration_Long",LongType(),True),
StructField("Start_Date_String",StringType(),True),
StructField("Start_Station_String",StringType(),True),
StructField("Start_Terminal_Long",LongType(),True),
StructField("End_Date_String",StringType(),True),
StructField("End_Station_String",StringType(),True),
StructField("End_Terminal_Long",LongType(),True),
StructField("Bike_#_Long",LongType(),True),
StructField("Subscriber_Type_String",StringType(),True),
StructField("Zip_Code_String",StringType(),True)
]
)
csvfile_MySchema=\
spark.read.format("csv").option("header","true").schema(MySchema).load("~/201508_trip_data.csv")
csvfile_MySchema.printSchema()
728x90
댓글