[Python] pandas vs pyspark 사용 및 코드 비교
이번 포스트에는 pyspark과 pandas를 쓰임새를 비교 해보고,
pyspark의 코드를 pandas코드로 옮겨보고 비교해보겠습니다.
1. Pandas vs Pyspark 언제쓸까?
- pandas와 spark는 dataframe이라는 SQL 테이블 또는 Excel 스프레드 시트에서와 같이 이질적으로 유형이 지정된 열이있는 데이블 형식 데이터를 다루게 될 때 아주 적합함
- 대부분의 데이터처리에는 pandas 라이브러리를 통해 해결했었는데, pandas와 spark의 가장 큰 차이는 핸들링하는 데이터의 양의 크기
- pandas는 통상적으로 spark보다 작은 데이터 처리에 대해서는 속도가 빠르다고 하는데, 10GB이상의 파일 처럼 파일의 크기가 커지게 되면, pandas는 메모리용량의 부족으로 인해 성능이 저하됨
pandas | spark | |
공통점 | dataframe을 통해 데이터 처리를 함 | |
차이점 | 적은 양의 데이터 일 때 성능이 좋다(10GB내) | 큰 양의 데이터일 때 성능이 좋다(TB,PB급 데이터) |
2. Pandas VS Pyspark 코드 예제
데이터 소스
https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD
pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
df_crash = spark.read.format("csv").option("header","true").option("inferschema","true")\
.load("경로/Motor_Vehicle_Collisions_-_Crashes.csv")
#날짜와 시간이 yyyy/MM/dd 와 HH:mm 형식으로 바뀌고, 시간대가 00:00~06:00시에 일어난 사건 중에 지역이 'MANHATTAN'인 곳의 날짜별 시간별
df_crash_losses=df_crash\
.withColumn("CRASH_DATE_FORMATTED",F.from_unixtime(F.unix_timestamp(F.col("CRASH DATE"),"MM/dd/yyyy"),"yyyy/MM/dd"))\
.withColumn("CRASH_TIME_HH",F.lpad(F.col("CRASH TIME"),5,"0"))\
.where(F.col("CRASH_TIME_HH").between("00:00","06:00") & F.col("BOROUGH").isin("MANHATTAN"))\
.groupBy(F.col("CRASH_DATE_FORMATTED"),F.col("CRASH_TIME_HH"))\
.agg(F.max(F.col("NUMBER OF PERSONS INJURED")).alias("TOTAL_INJURED")
,F.max(F.col("NUMBER OF PERSONS KILLED")).alias("TOTAL_KILLED"))
#날짜별 부상자 숫자 순위를 메기는 컬럼과 날짜별 사망자 숫자 순위를 메기는 컬럼을 생성해서 날짜별 부상자수가 1위거나 사망자수가 1위인 시간,날짜,컬럼조회
df_crash_or=df_crash_losses\
.withColumn("MAX_INJURED_DESC_RN",F.row_number().over(Window.partitionBy(F.col("CRASH_DATE_FORMATTED")).orderBy((F.col("TOTAL_INJURED")))))\
.withColumn("MAX_KILLED_DESC_RN",F.row_number().over(Window.partitionBy(F.col("CRASH_DATE_FORMATTED")).orderBy((F.col("TOTAL_KILLED")))))\
.where((F.col("MAX_INJURED_DESC_RN") <= 1) | (F.col("MAX_KILLED_DESC_RN") <= 1))\
.select(F.col("CRASH_DATE_FORMATTED")
,F.col("CRASH_TIME_HH")
,F.col("TOTAL_INJURED")
,F.col("TOTAL_KILLED"))\
.orderBy(F.col("CRASH_DATE_FORMATTED")
,F.col("CRASH_TIME_HH"))
df_crash_or.show()
#spark session
spark.stop()
pandas
import pandas as pd
pd.set_option('display.max_columns', None)
df_crash = pd.read_csv("경로/Motor_Vehicle_Collisions_-_Crashes.csv")
df_crash['CRASH_DATE_FORMATTED'] = pd.to_datetime(df_crash['CRASH DATE']).dt.strftime('%Y/%m/%d')
df_crash['CRASH_TIME_HH'] = df_crash['CRASH TIME'].str.pad(width=5, side='left', fillchar='0')
df_crash_losses = df_crash[(df_crash['CRASH_TIME_HH'].between("00:00","06:00")) & (df_crash['BOROUGH'].isin(['MANHATTAN']))]\
.groupby(['CRASH_DATE_FORMATTED','CRASH_TIME_HH'],as_index=False).agg(TOTAL_INJURED=pd.NamedAgg(column='NUMBER OF PERSONS INJURED',aggfunc='max')
,TOTAL_KILLED=pd.NamedAgg(column='NUMBER OF PERSONS KILLED',aggfunc='max'))
df_crash_losses['MAX_INJURED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_INJURED'].rank(method='first', ascending=False).astype('int')
df_crash_losses['MAX_KILLED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_KILLED'].rank(method='first', ascending=False).fillna(0).astype('int')
df_crash_or = df_crash_losses[((df_crash_losses['MAX_INJURED_DESC_RN'] <= 1) | (df_crash_losses['MAX_KILLED_DESC_RN'] <= 1))]
df_crash_or = df_crash_or[['CRASH_DATE_FORMATTED','CRASH_TIME_HH','TOTAL_INJURED','TOTAL_KILLED','MAX_INJURED_DESC_RN','MAX_KILLED_DESC_RN']]
df_crash_or.sort_values(by=['CRASH_DATE_FORMATTED','CRASH_TIME_HH'])
print(df_crash_or)
3. pandas의 데이터 처리 방법
1) pandas의 시간데이터
pandas에서는 시간데이터를 다룰 때 to_datetime, dt.strftime('%Y/%m/%d') 과 같이 사용
https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html
# mm-dd-yyyy, dd-mm-yyyy, mm/dd/yyyy, dd/mm/yyyy 와 같은 형태를 datetime format인 yyyy-mm-dd로 변경해줌
# to_datetime(df.컬럼이름,dayfirst=,monthfirst=,yearfirst=) 컬럼이름 다음의 인수들로 연월일 중 어떤 것이 먼저 나오는지 지정할 수 있다.
df['datetime_format'] = pd.to_datetime(df.컬럼이름)
df['user_format'] = df.dt.strftime('%Y/%m/%d')
# 위의 두 줄을 합쳐 다음과 같이 만들 수도 있다.
df['datetime_user_format'] = pd.to_datetime(df.컬럼이름).dt.strftime('%Y/%m/%d')
2) pandas의 데이터 채우기
pandas 에서는 데이터의 빈공간을 채울 때 series.str.pad()를 사용
https://pandas.pydata.org/docs/reference/api/pandas.Series.str.pad.html
# Series.str.pad(width, side='left', fillchar=' ')
# df_crash 데이터프레임의 CRASH TIME 열 데이터를 5길이까지 왼쪽으로 0을 채워넣는 코드
df_crash['CRASH_TIME_HH'] = df_crash['CRASH TIME'].str.pad(width=5, side='left', fillchar='0')
pandas에서는 데이터의 결측치를 채울 때 df.fillna()를 사용
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.fillna.html
df_crash_losses['MAX_KILLED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_KILLED'].rank(method='first', ascending=False).fillna(0).astype('int')
3) pandas에서의 형변환
새로운 컬럼을 생성하는 방법
https://pandas.pydata.org/docs/reference/api/pandas.to_numeric.html#pandas.to_numeric
https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html
df['newcol'] = pd.to_numeric(df['existcol'])
df['newcol2'] = pd.to_datetime(df['existcol2'])
astype() 메소드 사용
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.astype.html
# astype은 null값을 형변환 할 수 없으므로 null,NaN에 대해 처리를 해준 후 사용해야 한다.
df2 = df1['col'].astype('int')
convert_dtype() 메소드 사용
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.convert_dtypes.html
# convert_dtypes 메서드는 열의 요소가 혼합된 dtype일 경우, 열의 요소를 같은 dtype으로 통일할 수 있는 가장 합리적인 형식을 갖는 pd.NA로 변환
# 데이터프레임의 NA가 포함된 컬럼들 중 가장 맞는 데이터타입으로 변환
df2 = df.convert_dtype()
4) pandas에서의 조건절
pandas에서의 where조건절 사용 (특정 시간 사이, and 구문, isin)
# pandas에서의 특정 시간사이는 pyspark과 마찬가지로 .between()을 사용하여 표현한다.
# pandas에서의 and 구문은 &을 사용하여 표현한다.
# pandas에서의 isin() 도 pyspark과 마찬가지로 .isin([])와 같이 사용한다.
# df_crash[조건] 과 같이 df를 불러와 df_crash에 조건에 맞는 df를 새롭게 저장하는 개념
df_crash = df_crash[(df_crash['CRASH_TIME_HH'].between("00:00","06:00")) & (df_crash['BOROUGH'].isin(['MANHATTAN']))]
5) 윈도우 함수 row_number() over(partition by col1 order by col2) 구현
# df.groupby('partitionbycol',as_index=False)['orderbycol'].rank('first')
# groupby를 사용하면 기본으로 그룹 라벨이 index가 됨, index를 사용하고 싶은 않은 경우에는 as_index=False 를 설정
# rank()의 method='first'로 구현해야 row_number asending=False 로 하면 내림차순정렬
df_crash_losses['MAX_INJURED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_INJURED'].rank(method='first', ascending=False).astype('int')
6) pandas 에서의 NA, Null, NaN, None, inf
NA: Not Available의 약자로 누락된 데이터 = 결측값을 의미. 여기에는 NaN, None이 모두 포함된 개념
NaN: Not a Number의 약자로 숫자 형태의 누락된 데이터 = 결측값을 표현
None: 파이썬에서 누락된 데이터 = 결측값을 표현
null: NA와 동일하게 누락된 데이터를 의미
inf: infinite의 약자로 무한대를 의미
pandas에서의 None,inf
숫자형 | 문자형 | |
None | NaN | None |
inf | inf | inf |
pandas에서 NA == null == None이며 NaN은 숫자 데이터에 한해 표현되는 방식
그 예로 .isnull() 은 .isna()의 별칭(alias)와도 같이 사용된다.
7) pandas 와 pyspark의 OR, AND 처리
DESC_RN1, DESC_RN2가 있다고 가정,
where(DESC_RN1 <=1 & DESC_RN2<=1)
where(DESC_RN1 <=1 | DESC_RN2<=1)
위의 두개의 조건문이 있을 때 처리되는 방법이 각각 다르다.
pandas | pyspark | |
OR | DESC_RN1이 1이하거나 DESC_RN2가 1이하거나 2개이상이 나올 수 있음 | DESC_RN1이 1이하인 것만 나오거나, DESC_RN2이 1이하인 것만 나오거나, DESC_RN1,DESC_RN2가 둘다 1이하인것만 나옴 |
AND | 둘 다 1이하인 것만 나오지만 ROW_NUMBER를 통해 동일 값에 대해 순위를 매기는 체계가 달라 동일한 쿼리라도 총 개수가 달라짐 |
8) pandas row,column 전부 출력하기
Method 1:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
Method 2:
pd.options.display.max_columns = None
pd.options.display.max_rows = None
참조: