BigData/Spark & Spark Tuning

[Spark] spark standalone 모드로 pyspark 실습(.py spark submit)+ui확인하기

스파이디웹 2021. 11. 8. 17:49
728x90

이번 포스트에서는 spark standalone 모드 즉, yarn을 이용하지 않고 local(단일 노드로) pyspark을 제출하는 .py 파일을 생성해 스크립트 실행을 시켜 제출해보도록 하겠습니다.


하둡 및 스파크를 설치하지 못하신 분은 아래의 링크를 참고해 주세요.

 

2021.04.26 - [BigData] - [Hadoop] virtual box linux [ubuntu 18.04]에 하둡 설치,다운로드 1.virtualbox에 ubuntu 설치하기

 

[Hadoop] virtual box linux [ubuntu 18.04]에 하둡 설치,다운로드 1.virtualbox에 ubuntu 설치하기

1.virtual box를 다운로드한다. www.virtualbox.org/wiki/Downloads Downloads – Oracle VM VirtualBox Download VirtualBox Here you will find links to VirtualBox binaries and its source code. VirtualBox..

spidyweb.tistory.com

 

2021.05.16 - [BigData] - [Spark] virtual box linux [ubuntu 18.04]에 스파크 설치,다운로드 5.ubuntu 에 spark(스파크) 다운로드,설치

 

[Spark] virtual box linux [ubuntu 18.04]에 스파크 설치,다운로드 5.ubuntu 에 spark(스파크) 다운로드,설치

이번 포스트에는 Spark를 설치해 보겠습니다. vritualbox 설치,ubuntu설치, ssh통신, hadoop 설치가 완료 되지 않으신 분은 아래의 URL을 참고하여 완료해 주세요. 1.virtualbox 설치 및 ubuntu 설치 spidyweb.tis..

spidyweb.tistory.com


실습용 데이터
https://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Crashes/h9gi-nx95

 

Motor Vehicle Collisions - Crashes | NYC Open Data

 

data.cityofnewyork.us

뉴욕시티에서 일어난 자동차,오토바이 등 탈 것에 의한 충돌횟수에 대한 데이터


실습 시나리오

1. local에 newyork city crash csv파일을 다운로드 한다.
2-1. hdfs에 source data가 들어갈 directory를 생성한다.
2-2. local에 있는 csv파일을 2-1번에서 생성한 directory에 put한다.
3. pytyon 파일을 작성한다.

  1. sparksession 생성
  2. csvfile read하여 df_crash에 저장
  3. df_crash로 부터 날짜와 시간이 yyyy/MM/dd 와 HH:mm 형식으로 바뀌고, 시간대가 00:00~06:00시에 일어난 사건 중에 지역이 'MANHATTAN'인 곳의 날짜별 시간별 다치거나 사망한 수를 집계하여 df_crash_losses에 저장
  4. df_crash_losses로 부터 날짜별 부상자 숫자 순위를 메기는 컬럼과 날짜별 사망자 숫자 순위를 메기는 컬럼을 생성해서 날짜별 부상자수가 1위거나 사망자수가 1위인 날짜,시간,부상자수,사망자수를 조회하는 컬럼을 날짜,시간순으로 오름차순 정렬함
  5. parquet파일로써  file을 write하고, write하는 형식은 overwrite을 사용해서 멱등성을 유지, 날짜별 partition나눠서 file write

4. python 파일을 실행 및 결과 확인

조건

file을 read하는 foramt은 csvforamt으로 읽는다.
file을 write되는 format은 parquet format의 테이블이다.
python 파일은 멱등성(여러번 실행해도 같은 결과가 나오게끔)이 가능하도록 구성한다.

 

만들고자 하는 것

00:00 시에서 06:00시까지 맨해튼에서 날짜별 시간별 사망자 수가 제일 많거나, 부상자 수가 제일 많은

부상자 수, 사망자 수

(조건: 날짜별,시간별로 오름차순 정렬이 되게끔 하기)

 

 


1. local에 csv파일 다운로드

위의 사진에서 csv를 우클릭 후 링크 주소 복사를 통해 URL을 복사합니다.

1) 하둡 계정의 root디렉토리에서 wget url

wget이 없다면 wget을 다운 받아야합니다.

wget https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD

2) 생성된 파일 확인 및 이름변경

mv 'rows.csv?accessType=DOWNLOAD' Motor_Vehicle_collisions_-_Crashes.csv

2-1. HDFS 디렉토리 생성

1) 하둡을 실행

start-all.sh

2)hdfs에 /user/hadoop/spark 디렉토리 생성

(제 유저 이름은 Hadoop입니다. root 디렉토리가 /User/Hadoop)

hdfs dfs -mkdir /user/hadoop/spark

3) hdfs에 생성된 디렉토리 확인

hdfs dfs -ls /User/Hadoop


2-2. HDFS 디렉토리에 csv파일 PUT

1) PUT

hdfs dfs -put Motor_Vehicle_Collisions_-_Crashes.csv /user/hadoop/spark

2) 확인하기

hdfs dfs -ls /user/hive/spark


3. python 파일 작성하기

pyspark를 yarn없이 단일 노드에 돌리려면 script에 findspark가 있어야 pyspark 라이브러리를 사용할 수 있습니다.

findspark 다운로드

pip3 install findspark -> /home/hadoop/.local/lib/python3.8/site-packages 에 들어감

1) hadoop계정의 root디렉토리에서 파일생성 및 편집

touch pyspark_ETL_parqeut.py

nano pyspark_ETL_parqeut.py

(nano가 안되시면 다운받아야합니다.)

 

2) 원하는 결과물

00:00~06:00 시간내에 맨해튼에서 날짜별, 시간별 부상자가 가장 많거나, 사망자가 가장 많은 데이터 구하기
CRASH_DATE
CRASH_TIME
NUMBER_OF_PERSONS_INJURED
NUMBER_OF_PERSONS_KILLED
컬럼을 사용


처리방법

CRASH_DATE 는 MM/dd/yyyy형태, CRASH_TIME은 H:mm 혹은 HH:mm형태
따라서,
1) 오름차순 정렬을 위한, CRASH_DATE를 yyyy/MM/dd형태로 바꾸기위해 unix_timestamp, from_unixtime을 사용

2) 조건으로 00:00과 06:00사이에 있는 데이터를 대상으로 하기때문에, CRASH_TIME을 HH:mm형태로 만들기 위해 LPAD를 사용

3) BOROUGH의 값이 "MANHATTAN"인 것만 추출

4)날짜별, 시간별 부상자수와 사망 수를 구하기

5)날짜별 부상자수를 row_number를 통해 내림차순 등수 구하기, 사망자수를 row_number를 통해 내림차순 등수 구하기

6)날짜별 부상자수가 1위이거나, 사망자수가 1위인 조건을 걸어 날짜,시간,부상자 수, 사망자 수를 구하기

7)날짜, 시간순으로 오름차순 정렬

 

8) 날짜별 파티션으로 overwrite mode로 parquet형식으로 저장


3) pyspark_ETL_parquet.py 내용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
df_crash = spark.read.format("csv").option("header","true").option("inferschema","true")\
                .load("hdfs://localhost:9000/user/spark/Motor_Vehicle_Collisions_-_Crashes.csv")
 
#날짜와 시간이 yyyy/MM/dd 와 HH:mm 형식으로 바뀌고, 시간대가 00:00~06:00시에 일어난 사건 중에 지역이 'MANHATTAN'인 곳의 날짜별 시간별
df_crash_losses=df_crash\
.withColumn("CRASH_DATE_FORMATTED",F.from_unixtime(F.unix_timestamp(F.col("CRASH DATE"),"MM/dd/yyyy"),"yyyy/MM/dd"))\
.withColumn("CRASH_TIME_HH",F.lpad(F.col("CRASH TIME"),5,"0"))\
.where(F.col("CRASH_TIME_HH").between("00:00","06:00"& F.col("BOROUGH").isin("MANHATTAN"))\
.groupBy(F.col("CRASH_DATE_FORMATTED"),F.col("CRASH_TIME_HH"))\
.agg(F.max(F.col("NUMBER OF PERSONS INJURED")).alias("TOTAL_INJURED")
    ,F.max(F.col("NUMBER OF PERSONS KILLED")).alias("TOTAL_KILLED"))
    
#날짜별 부상자 숫자 순위를 메기는 컬럼과 날짜별 사망자 숫자 순위를 메기는 컬럼을 생성해서 날짜별 부상자수가 1위거나 사망자수가 1위인 시간,날짜,컬럼조회
df_crash_or=df_crash_losses\
.withColumn("MAX_INJURED_DESC_RN",F.row_number().over(Window.partitionBy(F.col("CRASH_DATE_FORMATTED")).orderBy((F.col("TOTAL_INJURED").desc()))))\
.withColumn("MAX_KILLED_DESC_RN",F.row_number().over(Window.partitionBy(F.col("CRASH_DATE_FORMATTED")).orderBy((F.col("TOTAL_KILLED").desc()))))\
.where((F.col("MAX_INJURED_DESC_RN"<= 1| (F.col("MAX_KILLED_DESC_RN"<= 1))\
.select(F.col("CRASH_DATE_FORMATTED")
       ,F.col("CRASH_TIME_HH")
       ,F.col("TOTAL_INJURED")
       ,F.col("TOTAL_KILLED"))\
.orderBy(F.col("CRASH_DATE_FORMATTED")
        ,F.col("CRASH_TIME_HH"))
 
df_crash_or.write.format("parquet").mode("overwrite")\
           .partitionBy("CRASH_DATE_FORMATTED").save("hdfs://localhost:9000/user/spark/total_injured_killed_datetime")
 
#spark session 
spark.stop()
 
cs

spark 실습 ubuntu(linux).txt
0.00MB


4 .py 파일 spark submit을 통한 실행 및 결과 확인

1) .py 파일 spark-submit 을 통해 실행

spark-submit pyspark_ETL_parquet.py

2) UI확인 및 결과 파일 확인하기

4040포트가 포트포워딩 되어 있어야 합니다.(spark ui)

9870 포트가 포트포워딩 되어 있어야 합니다.(hdfs namenode)

날짜별 partition 된 디렉토리 및 파일확인

 


알게 된 점

*data node 가 안떠있으면 file write가 안되니 datanode가 떠있는지 확인.

*파티션된 디렉토리별 들어있는 데이터의 수가 적어 효율은 나오지 않는다.(ETL작업 및 조회의 성능 등) ->실무에서 날짜별, 시간별로 partition 나눠 파일을 쓰는 케이스가 많아 테스트.

*당연한 얘기지만 master에 yarn이 아닌 local로 돌리면 resource manager에는 기록되지 않는다.

728x90