language/Python

[Python] pandas vs pyspark 사용 및 코드 비교

스파이디웹 2022. 9. 22. 12:52
728x90

이번 포스트에는 pyspark과 pandas를 쓰임새를 비교 해보고,

pyspark의 코드를 pandas코드로 옮겨보고 비교해보겠습니다.


1. Pandas vs Pyspark 언제쓸까?

  • pandas와 spark는 dataframe이라는 SQL 테이블 또는 Excel 스프레드 시트에서와 같이 이질적으로 유형이 지정된 열이있는 데이블 형식 데이터를 다루게 될 때 아주 적합함
  • 대부분의 데이터처리에는 pandas 라이브러리를 통해 해결했었는데, pandas와 spark의 가장 큰 차이는 핸들링하는 데이터의 양의 크기
  • pandas는 통상적으로 spark보다 작은 데이터 처리에 대해서는 속도가 빠르다고 하는데, 10GB이상의 파일 처럼 파일의 크기가 커지게 되면, pandas는 메모리용량의 부족으로 인해 성능이 저하됨
  pandas spark
공통점 dataframe을 통해 데이터 처리를 함
차이점 적은 양의 데이터 일 때 성능이 좋다(10GB내) 큰 양의 데이터일 때 성능이 좋다(TB,PB급 데이터)

 


2. Pandas VS Pyspark 코드 예제

데이터 소스

https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD

pyspark

import findspark
findspark.init()
 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
df_crash = spark.read.format("csv").option("header","true").option("inferschema","true")\
                .load("경로/Motor_Vehicle_Collisions_-_Crashes.csv")
 
#날짜와 시간이 yyyy/MM/dd 와 HH:mm 형식으로 바뀌고, 시간대가 00:00~06:00시에 일어난 사건 중에 지역이 'MANHATTAN'인 곳의 날짜별 시간별
df_crash_losses=df_crash\
.withColumn("CRASH_DATE_FORMATTED",F.from_unixtime(F.unix_timestamp(F.col("CRASH DATE"),"MM/dd/yyyy"),"yyyy/MM/dd"))\
.withColumn("CRASH_TIME_HH",F.lpad(F.col("CRASH TIME"),5,"0"))\
.where(F.col("CRASH_TIME_HH").between("00:00","06:00") & F.col("BOROUGH").isin("MANHATTAN"))\
.groupBy(F.col("CRASH_DATE_FORMATTED"),F.col("CRASH_TIME_HH"))\
.agg(F.max(F.col("NUMBER OF PERSONS INJURED")).alias("TOTAL_INJURED")
    ,F.max(F.col("NUMBER OF PERSONS KILLED")).alias("TOTAL_KILLED"))
    
#날짜별 부상자 숫자 순위를 메기는 컬럼과 날짜별 사망자 숫자 순위를 메기는 컬럼을 생성해서 날짜별 부상자수가 1위거나 사망자수가 1위인 시간,날짜,컬럼조회
df_crash_or=df_crash_losses\
.withColumn("MAX_INJURED_DESC_RN",F.row_number().over(Window.partitionBy(F.col("CRASH_DATE_FORMATTED")).orderBy((F.col("TOTAL_INJURED")))))\
.withColumn("MAX_KILLED_DESC_RN",F.row_number().over(Window.partitionBy(F.col("CRASH_DATE_FORMATTED")).orderBy((F.col("TOTAL_KILLED")))))\
.where((F.col("MAX_INJURED_DESC_RN") <= 1) | (F.col("MAX_KILLED_DESC_RN") <= 1))\
.select(F.col("CRASH_DATE_FORMATTED")
       ,F.col("CRASH_TIME_HH")
       ,F.col("TOTAL_INJURED")
       ,F.col("TOTAL_KILLED"))\
.orderBy(F.col("CRASH_DATE_FORMATTED")
        ,F.col("CRASH_TIME_HH"))
 
df_crash_or.show()
#spark session 
spark.stop()

pandas

import pandas as pd
pd.set_option('display.max_columns', None)
df_crash = pd.read_csv("경로/Motor_Vehicle_Collisions_-_Crashes.csv")

df_crash['CRASH_DATE_FORMATTED'] = pd.to_datetime(df_crash['CRASH DATE']).dt.strftime('%Y/%m/%d')
df_crash['CRASH_TIME_HH'] = df_crash['CRASH TIME'].str.pad(width=5, side='left', fillchar='0')
df_crash_losses = df_crash[(df_crash['CRASH_TIME_HH'].between("00:00","06:00")) & (df_crash['BOROUGH'].isin(['MANHATTAN']))]\
                                              .groupby(['CRASH_DATE_FORMATTED','CRASH_TIME_HH'],as_index=False).agg(TOTAL_INJURED=pd.NamedAgg(column='NUMBER OF PERSONS INJURED',aggfunc='max')
                                                                                                                     ,TOTAL_KILLED=pd.NamedAgg(column='NUMBER OF PERSONS KILLED',aggfunc='max'))
df_crash_losses['MAX_INJURED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_INJURED'].rank(method='first', ascending=False).astype('int')
df_crash_losses['MAX_KILLED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_KILLED'].rank(method='first', ascending=False).fillna(0).astype('int')
df_crash_or = df_crash_losses[((df_crash_losses['MAX_INJURED_DESC_RN'] <= 1) | (df_crash_losses['MAX_KILLED_DESC_RN'] <= 1))]
df_crash_or = df_crash_or[['CRASH_DATE_FORMATTED','CRASH_TIME_HH','TOTAL_INJURED','TOTAL_KILLED','MAX_INJURED_DESC_RN','MAX_KILLED_DESC_RN']]
df_crash_or.sort_values(by=['CRASH_DATE_FORMATTED','CRASH_TIME_HH'])

print(df_crash_or)

3. pandas의 데이터 처리 방법

1) pandas의 시간데이터

pandas에서는 시간데이터를 다룰 때 to_datetime, dt.strftime('%Y/%m/%d') 과 같이 사용

https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html

# mm-dd-yyyy, dd-mm-yyyy, mm/dd/yyyy, dd/mm/yyyy 와 같은 형태를 datetime format인 yyyy-mm-dd로 변경해줌
# to_datetime(df.컬럼이름,dayfirst=,monthfirst=,yearfirst=) 컬럼이름 다음의 인수들로 연월일 중 어떤 것이 먼저 나오는지 지정할 수 있다.
df['datetime_format'] = pd.to_datetime(df.컬럼이름)

df['user_format'] = df.dt.strftime('%Y/%m/%d')

# 위의 두 줄을 합쳐 다음과 같이 만들 수도 있다.
df['datetime_user_format'] = pd.to_datetime(df.컬럼이름).dt.strftime('%Y/%m/%d')

2) pandas의 데이터 채우기

pandas 에서는 데이터의 빈공간을 채울 때 series.str.pad()를 사용

https://pandas.pydata.org/docs/reference/api/pandas.Series.str.pad.html

# Series.str.pad(width, side='left', fillchar=' ')

# df_crash 데이터프레임의 CRASH TIME 열 데이터를 5길이까지 왼쪽으로 0을 채워넣는 코드
df_crash['CRASH_TIME_HH'] = df_crash['CRASH TIME'].str.pad(width=5, side='left', fillchar='0')

pandas에서는 데이터의 결측치를 채울 때 df.fillna()를 사용

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.fillna.html

df_crash_losses['MAX_KILLED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_KILLED'].rank(method='first', ascending=False).fillna(0).astype('int')

3) pandas에서의 형변환

새로운 컬럼을 생성하는 방법

https://pandas.pydata.org/docs/reference/api/pandas.to_numeric.html#pandas.to_numeric

https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html

df['newcol'] = pd.to_numeric(df['existcol'])

df['newcol2'] = pd.to_datetime(df['existcol2'])

astype() 메소드 사용

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.astype.html

# astype은 null값을 형변환 할 수 없으므로 null,NaN에 대해 처리를 해준 후 사용해야 한다.
df2 = df1['col'].astype('int')

convert_dtype() 메소드 사용

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.convert_dtypes.html

# convert_dtypes 메서드는 열의 요소가 혼합된 dtype일 경우, 열의 요소를 같은 dtype으로 통일할 수 있는 가장 합리적인 형식을 갖는 pd.NA로 변환
# 데이터프레임의 NA가 포함된 컬럼들 중 가장 맞는 데이터타입으로 변환

df2 = df.convert_dtype()

4) pandas에서의 조건절

pandas에서의 where조건절 사용 (특정 시간 사이, and 구문, isin)

# pandas에서의 특정 시간사이는 pyspark과 마찬가지로 .between()을 사용하여 표현한다.
# pandas에서의 and 구문은 &을 사용하여 표현한다.
# pandas에서의 isin() 도 pyspark과 마찬가지로 .isin([])와 같이 사용한다.
# df_crash[조건] 과 같이 df를 불러와 df_crash에 조건에 맞는 df를 새롭게 저장하는 개념

df_crash = df_crash[(df_crash['CRASH_TIME_HH'].between("00:00","06:00")) & (df_crash['BOROUGH'].isin(['MANHATTAN']))]

5) 윈도우 함수 row_number() over(partition by col1 order by col2) 구현

# df.groupby('partitionbycol',as_index=False)['orderbycol'].rank('first')
# groupby를 사용하면 기본으로 그룹 라벨이 index가 됨, index를 사용하고 싶은 않은 경우에는 as_index=False 를 설정
# rank()의 method='first'로 구현해야 row_number asending=False 로 하면 내림차순정렬
df_crash_losses['MAX_INJURED_DESC_RN'] = df_crash_losses.groupby('CRASH_DATE_FORMATTED',as_index=False)['TOTAL_INJURED'].rank(method='first', ascending=False).astype('int')

6) pandas 에서의 NA, Null, NaN, None, inf

NA: Not Available의 약자로 누락된 데이터 = 결측값을 의미. 여기에는 NaN, None이 모두 포함된 개념

NaN: Not a Number의 약자로 숫자 형태의 누락된 데이터 = 결측값을 표현

None: 파이썬에서 누락된 데이터 = 결측값을 표현

null: NA와 동일하게 누락된 데이터를 의미

inf: infinite의 약자로 무한대를 의미

 

pandas에서의 None,inf

  숫자형 문자형
None NaN None
inf inf inf
pandas에서 NA == null == None이며 NaN은 숫자 데이터에 한해 표현되는 방식
그 예로 .isnull() 은 .isna()의 별칭(alias)와도 같이 사용된다.

 


7) pandas 와 pyspark의 OR, AND 처리

DESC_RN1, DESC_RN2가 있다고 가정,

where(DESC_RN1 <=1 & DESC_RN2<=1)

where(DESC_RN1 <=1 | DESC_RN2<=1)

위의 두개의 조건문이 있을 때 처리되는 방법이 각각 다르다.

  pandas pyspark
OR DESC_RN1이 1이하거나 DESC_RN2가 1이하거나 2개이상이 나올 수 있음 DESC_RN1이 1이하인 것만 나오거나, DESC_RN2이 1이하인 것만 나오거나, DESC_RN1,DESC_RN2가 둘다 1이하인것만 나옴
AND 둘 다 1이하인 것만 나오지만 ROW_NUMBER를 통해 동일 값에 대해 순위를 매기는 체계가 달라 동일한 쿼리라도 총 개수가 달라짐

 


8) pandas row,column 전부 출력하기

Method 1:

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

Method 2:

pd.options.display.max_columns = None
pd.options.display.max_rows = None

 

 

참조:

https://brownbears.tistory.com/642

728x90