728x90
이번 포스트에는 아주 간단한 ETL을 구현하는 프로젝트를 해보겠습니다.
프로젝트 자체는 필요한 것만 최소한의 시간으로 최소한의 비용으로 구성했으며, 점차 추가기능들을 구현하거나 수정할 부분을 수정하여, sub version을 포스트 하겠습니다.
프로젝트에서 다루는 것
ETL python파일
- corona API를 restAPI로 받아오는 법 --수집
- python library인 bs4를 이용하여 lxml로 파싱후, list로 변환 --처리
- list를 pyspark dataframe으로 변환 --처리
- 오늘, 전체, 날짜별 dataframe을 단일 csv파일로 저장 --저장
- 오늘, 전체, 날짜별 dataframe을 기준날짜를 기준으로 partition하여(partitionBy) csv파일로 저장 --저장
- dataframe을 mysql에 저장 --저장
- 당일확진자 수, 어제확진자와 비교하여 증감 수 출력
Daily Batch환경 구성
- AWS EC2 구성하는 구성하는 법
- EC2내 spark 설치하는 법
- EC2내 airflow 설치하고 서비스(데몬)으로 실행시키는 법
- EC2내 airflow를 이용하여 ETL 파이썬 파일 실행시키고 s3로 데이터 보내기(복사하기)
- EC2를 airflow로 자동으로 종료하는 법
- EC2 image builder(pipeline)을 통해 정해진 시간에 EC2 띄우는법
개발 환경 및 ec2 서버 성능, 버전
windows 10 (16GB mem, 4core)
jupyternotebook(anaconda-5.2.0)
- 대화식 환경이라 짠 코드를 바로 확인하기 좋음
spark-3.1.2-hadoop2.7
- pandas가 작은 크기의 데이터처리에는 빠르고 굳이 spark를 사용할 필요는 없지만, 차후에 큰 데이터를 처리하기위해 spark(pyspark)로 처리하는데 의의를 둠
python3 ++
- bs4, requests, datetime - 간단하게 restAPI를 불러오고 파싱 및 처리할 수 있기에 사용
- pyspark
ec2 ubuntu 20.04 LTS 8GB 2core
- local(windows)의 환경과 비슷하면서 최소한의 비용으로 spark, airflow를 문제없이 돌릴 수 있는 환경
python3 ++
- local에서의 환경과 동일하게 구성
airflow-2.2.3
- 최신 버전, 도커 없이 수동설치해보기 위함
spark-3.1.2-bin-hadoop2.7
- local에서의 환경과 동일하게 구성
image builder
- 이미지를 저장하고 builder를 사용하는데에는 비용이 들지 않기 때문에 사용
- boto3, cloudformation, lambda, teraform을 사용하는데 익숙하지않음
ETL 프로젝트 구현 과정 및 결과물
구현 과정
- 공공데이터포털 가입 및 api 활용 신청
- 코로나 api 불러와서 bs4 library를 이용하여 lxml로 파싱
- 파싱된 lxml를 데이터타입 정의하여 list로 변환
- list에 스키마를 입혀 pyspark dataframe으로 변환
- 전체, 오늘, 범위별 (기준일자, 당일확진자수, 누적확진자수, 당일사망자수, 누적사망자수,
당일의심신고검사자수, 누적의심신고검사자수, 전체확진자치명률)을 구하는 Dataframe 반환하는 함수 구현 - 반환된 Dataframe으로 단일CSV파일로 write, 기준일자별 partition된 csv파일로 write(+cache를 통한 data scan과정줄이기)
- 반환된 Dataframe으로 mysql에 데이터 insert(+cache를 통한 data scan과정줄이기)
- AWS EC2 구성하는 구성하는 법
- EC2내 spark 설치하는 법
- EC2내 airflow 설치하고 서비스(데몬)으로 실행시키는 법(+메타 데이터베이스 mysql로 설정)
- EC2내 airflow를 이용하여 ETL 파이썬 파일 실행시키고 s3로 데이터 보내기(복사하기)
- EC2를 airflow로 자동으로 종료하는 법
- EC2 image builder(pipeline)을 통해 정해진 시간에 EC2 띄우는법
결과물
매일 10시쯤에는 공공데이터 포털에 코로나api데이터가 반영된다.
-> 9시55분에 ec2 image builder를 통해 ec2서버를 띄운다(자동화)
airflow dags를 통해 job을 10시에 실행 시키도록 한다.
-> airflow를 통해 pyspark job을 실행시켜 local에 파일을 떨어트리고, mysql에 데이터를 입력
-> airflow를 통해 local에 떨어진 파일을 s3로 복사, local에 떨어진 airflow log를 s3로 복사
(s3에 local에 생성된 pyspark job을 통한 csv파일, airflow log파일이 적재)
비용절감을 위해 모든 job이 끝난 ec2서버는 종료시킨다.
-> airflow를 통해 7분뒤 ec2를 자동으로 종료시키게 한다.(자동화)
728x90
댓글