본문 바로가기
BigData/Project

[Project] ETL Daily batch project(공공데이터 코로나 API, Python, PySpark, Airflow, AWS) 3. ETL python 파일 만들기

by 스파이디웹 2022. 2. 17.
728x90

이번 포스트는 공공데이터 코로나 API를 request로 받아 bs4로 lxml로 변환 후 pyspark를 이용해 데이터 처리를 해보겠습니다.

 

어떤 프로젝트인지 잘 모르시는 분은 아래의 링크를 참고해주세요.

2022.02.17 - [BigData/Project] - [Project] ETL Daily batch project(공공데이터 코로나 API, Python, PySpark, Airflow, AWS) 1. 프로젝트 개요


1. 공공데이터 API를 REST하게 받아오기

코로나감염현황 API를 불러오기 위해서는 해당 url과 일반 인증키가 필요합니다.(일반 인증키는 자신의 계정 마이페이지 활용 신청된 페이지에서 확인하실 수 있습니다.)

또한 startCreateDT, endCreateDT 매개변수를 채워줄 인자가 필요합니다.

3가지를 params로 정의하여 requests를 이요하여 호출하고, BeautifulSoup를 통해 lxml로 변환시킵니다.

import requests
from bs4 import BeautifulSoup
from datetime import datetime, date, timedelta

def getCovid19Info(start_date: date, end_date: date):
    url = 'http://openapi.data.go.kr/openapi/service/rest/Covid19/getCovid19InfStateJson'
    api_key_utf8 = 'mrAZG1yevJBgcaaSuVLOgJ%2BS6blzA0SXlGYZrwxwpARTaMnSotfqFooTr6dgKpPcTBtE96l0xE%2B%2BmXxDrWt19g%3D%3D'
    api_key_decode = requests.utils.unquote(api_key_utf8, encoding='utf-8')

    params ={
        'serviceKey' : api_key_decode,
        'startCreateDt' : int('{:04d}{:02d}{:02d}'.format(start_date.year, start_date.month, start_date.day)),
        'endCreateDt' : int('{:04d}{:02d}{:02d}'.format(end_date.year, end_date.month, end_date.day)),
    }

    response = requests.get(url, params=params)
    content = response.text
    elapsed_us = response.elapsed.microseconds
    print('Reqeust Done, Elapsed: {} seconds'.format(elapsed_us / 1e6))#100만
    
    return BeautifulSoup(content, "lxml")#불러온 text 데이터를 python xml인 lxml로 변환

코로나 API를 받아오는 함수를 오늘 날짜로 실행시키면 해당 값들을 불러오게 됩니다.


2. 받아온 API데이터를 pyspark DataFrame으로 변환시키기

  1. temp에 API를받아 lxml로 변환한 값을 저장합니다.
  2. temp에서 item만 추출하여 tag의 이름과 내용을 추출하여 dictionary형태의 list로 각 내용을 저장합니다.
  3. dictionary들의 list로 이루어진 것을 dataframe으로 변환하기전 schema를 정의하고, dataframe으로 변환시켜줍니다.
import requests
from bs4 import BeautifulSoup
from datetime import datetime, date, timedelta
from typing import Union
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, DoubleType, IntegerType, TimestampType, StringType
#sparksession 드라이버 프로세스 얻기
#이후에 나오는 mysql에 데이터 저장하기에 사용될 mysql connector jars파일을 옵션에 추가해준다.
spark = SparkSession.builder.master("local[*]").config("spark.driver.extraClassPath","C:/spark/spark-3.1.2-bin-hadoop2.7/jars/mysql-connector-java-8.0.28").appName("pyspark").getOrCreate()

def getCovid19SparkDataFrame(start_date: Union[date, datetime], end_date: Union[date,datetime]):
    #key값의 데이터 타입 정의
    convert_method = {
        'accdefrate' : float,
        'accexamcnt' : int,
        'createdt' : lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'),
        'deathcnt' : int,
        'decidecnt' : int,
        'seq' : int,
        'statedt' : str,
        'statetime' : str,
        'updatedt' : lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'),
    }

    temp = getCovid19Info(start_date, end_date)
    items = temp.find('items')#item만 추출
    item_list = []
    for item in items:
        item_dict = {}
        for tag in list(item):
            try:
                item_dict[tag.name] = convert_method[tag.name](tag.text)
            except Exception:
                item_dict[tag.name] = None
        item_list.append(item_dict)#lxml데이터를 dictionary in list로 변환

    #dictionary의 list를 spark DataFrame으로 바꾸는 방법 dictionary 속의 key를 schema를 정의해준다.
    CovidInfo_item_Schema = StructType([
        StructField('accdefrate', DoubleType(), True),#누적확진률
        StructField('accexamcnt', IntegerType(), True),#누적의심신고검사자
        StructField('createdt', TimestampType(),True),#등록일시분초
        StructField('deathcnt', IntegerType(), True),#사망자 수
        StructField('decidecnt', IntegerType(), True),#확진자 수
        StructField('seq', IntegerType(), True),#게시글 번호(감염현황 고유값)
        StructField('statedt', StringType(), True),#기준일
        StructField('statetime', StringType(), True),#기준시간
        StructField('updatedt', TimestampType(), True)#수정일시분초
    ])
    
    df_Covid19 = spark.createDataFrame(item_list,CovidInfo_item_Schema)
    return df_Covid19

한번 dataframe으로 잘 만들어지는 지 확인해보겠습니다.

코로나가 시작된 이래로 오늘까지의 데이터를 pyspark dataframe으로 저장

now = datetime.now()

def getAllCovid19Data():
    df_Covid19 = getCovid19SparkDataFrame(date(2019,1,1), now)
    print("Today: {}\nLoaded {} Records".format(now.strftime('%Y-%m-%d'),df_Covid19.distinct().count()))
    return df_Covid19

데이터를 dataframe으로 저장된 것을 확인 할 수 있습니다.


3. 의미있는 데이터 추출하기

의미 있는 데이터라 하면 일반 사람들이 보기에 편한 정보, 필요한 정보 그리고 원하는 날짜별로 볼 수 있어야된다는 생각을 가지고 만들었습니다.

 

  1. 불러올 API데이터에서 기준날짜, 당일확진자수, 누적확진자수, 당일사망자수, 누적사망자수, 치명률을 추출하여 보여줍니다. (원래는 코로나 검사자 수 데이터도 취급했지만, 2022년 2월부로 확증자 폭증에 따라 검사자 수는 의미가 없어 제외)
  2. 오늘, 전체 , 원하는 기간 동안의 데이터를 사용자가 직접 정할 수 있게 끔 함수를 3가지로구현
#코로나 매일의 확진자, 누적확진자, 사망자, 누적사망자, 치명률을 뽑아낸 dataframe
#임시 선별 검사자 수는 제외(api를 못구함)
#데이터 조회시 메소드.show() ex)getAllCovidAffectedNumbers().show()
def getAllCovidAffectedNumbers():
    from pyspark.sql import Window
    df_Covid19 =  getCovid19SparkDataFrame(date(2019,1,1), now)
    df_Covid19_result_All=\
    df_Covid19.distinct()\
              .withColumn("decidecnt-1",F.coalesce(F.lead(F.col("decidecnt"),1).over(Window.orderBy(F.col("statedt").desc())),F.lit(0)))\
              .withColumn("deathcnt-1",F.coalesce(F.lead(F.col("deathcnt"),1).over(Window.orderBy(F.col("statedt").desc())),F.lit(0)))\
              .withColumn("기준날짜",F.from_unixtime(F.unix_timestamp(F.col("statedt"),"yyyyMMdd"),"yyyy-MM-dd"))\
              .select(F.col("기준날짜")
                     ,(F.col("decidecnt")-F.col("decidecnt-1")).alias("당일확진자수")
                     ,F.col("decidecnt").alias("누적확진자수")
                     ,(F.col("deathcnt")-F.col("deathcnt-1")).alias("당일사망자수")
                     ,F.col("deathcnt").alias("누적사망자수")
                     ,F.round((F.col("deathcnt")/F.col("decidecnt")*F.lit(100)),2).alias("전체확진자치명률(%)"))
    return df_Covid19_result_All

#코로나 오늘의 확진자, 누적확진자, 사망자, 누적사망자, 치명률 뽑아낸 dataframe
#임시 선별 검사자 수는 제외(api를 못구함)
#데이터 조회시 메소드.show() ex)getAllCovidAffectedNumbers().show()    
def getTodayCovidAffectedNumbers():
    from pyspark.sql import Window
    now_date=now.strftime('%Y-%m-%d')
    df_Covid19 =  getCovid19SparkDataFrame(date(2019,1,1), now)
    df_Covid19_result_Today=\
    df_Covid19.distinct()\
              .withColumn("decidecnt-1",F.coalesce(F.lead(F.col("decidecnt"),1).over(Window.orderBy(F.col("statedt").desc())),F.lit(0)))\
              .withColumn("deathcnt-1",F.coalesce(F.lead(F.col("deathcnt"),1).over(Window.orderBy(F.col("statedt").desc())),F.lit(0)))\
              .withColumn("기준날짜",F.from_unixtime(F.unix_timestamp(F.col("statedt"),"yyyyMMdd"),"yyyy-MM-dd"))\
              .where(F.col("기준날짜")==now_date)\
              .select(F.col("기준날짜")
                     ,(F.col("decidecnt")-F.col("decidecnt-1")).alias("당일확진자수")
                     ,F.col("decidecnt").alias("누적확진자수")
                     ,(F.col("deathcnt")-F.col("deathcnt-1")).alias("당일사망자수")
                     ,F.col("deathcnt").alias("누적사망자수")
                     ,F.round((F.col("deathcnt")/F.col("decidecnt")*F.lit(100)),2).alias("전체확진자치명률(%)"))
    return df_Covid19_result_Today

#코로나 범위내의 확진자, 누적확진자, 사망자, 누적사망자, 치명률 뽑아낸 dataframe
#임시 선별 검사자 수는 제외(api를 못구함)
#데이터 조회시 메소드.show() ex)getAllCovidAffectedNumbers().show()
def getPeriodCovidAffectedNumbers(start_date: Union[date, datetime], end_date: Union[date,datetime]):
    from pyspark.sql import Window
    df_Covid19 =  getCovid19SparkDataFrame(date(2019,1,1), now)
    df_Covid19_result_Period=\
    df_Covid19.distinct()\
              .withColumn("decidecnt-1",F.coalesce(F.lead(F.col("decidecnt"),1).over(Window.orderBy(F.col("statedt").desc())),F.lit(0)))\
              .withColumn("deathcnt-1",F.coalesce(F.lead(F.col("deathcnt"),1).over(Window.orderBy(F.col("statedt").desc())),F.lit(0)))\
              .withColumn("기준날짜",F.from_unixtime(F.unix_timestamp(F.col("statedt"),"yyyyMMdd"),"yyyy-MM-dd"))\
              .where(F.col("기준날짜").between(start_date,end_date))\
              .select(F.col("기준날짜")
                     ,(F.col("decidecnt")-F.col("decidecnt-1")).alias("당일확진자수")
                     ,F.col("decidecnt").alias("누적확진자수")
                     ,(F.col("deathcnt")-F.col("deathcnt-1")).alias("당일사망자수")
                     ,F.col("deathcnt").alias("누적사망자수")
                     ,F.round((F.col("deathcnt")/F.col("decidecnt")*F.lit(100)),2).alias("전체확진자치명률(%)"))
    return df_Covid19_result_Period, start_date, end_date#이후에 함수별로 저장할 때, DF가 기간으로 입력받은 것을 알기위해 기간값도 리턴

4. 추출한 데이터 CSV파일로 저장, mysql Database에 저장

  1. 추출한 데이터를 CSV형태의 단일파일로 쓰는데, 여기서 매개변수를 dataframe으로 두고 해당 데이터프레임이 오늘날짜인지, 전체날짜인지, 특정 기간인지에 따라 디렉토리를 구분하여 저장하는 함수 구현
  2. 추출한 데이터를 CSV형태의 파일로 쓰는데, 기준날짜를 기준으로 partitioned된 즉, partitionBy("기준날짜")를 적용하여 저장, 마찬가지로 추출하고자 하는 데이터의 기간에 따라 매개변수를 준다.
  3. 추출한 데이터를 mysql database에 insert하는데 특정 기간이 아닌 모든 데이터만을 저장하는 기능을 구현하는 함수를 만든다. (해당 링크에서 mysql connector를 다운받아 sparksession생성할 때 option에 jar파일을 추가해준다.)(mysql에 미리 database, table, user를 만들어 놔야한다.)   https://dev.mysql.com/downloads/connector/j/
  4. 모든 추출된 데이터는 멱등성을 위해 overwrite으로 데이터를 쓴다.
  5. 여러번 scan되는 데이터프레임의 scan시간을 줄이기위해 cache()를 통해 데이터 스캔 횟수를 줄인다.
def saveDataAsCSV(dataframe):
    if type(dataframe) == tuple:#기간을 받아 온 dataframe은 return할 시 (dataframe,start_date,end_date) 튜플형태로 리턴하기 때문에 인덱싱으로 제어
        dataframe[0].cache()#cache의 수명은 함수가 호출되서 끝나기까지 이므로 매 함수마다 적용
        start_date=dataframe[1]
        end_date=dataframe[2]
        if dataframe[0].count() == getPeriodCovidAffectedNumbers(start_date, end_date)[0].count():
            print("it worked!")
            return dataframe[0].coalesce(1).write.format("csv").option("header","true").mode("overwrite").save("C:/covid19_result/period/")
    else:
        dataframe.cache()
        if dataframe.count() == getAllCovidAffectedNumbers().count():#dataframe처음 호출, 함수를 통한 dataframe처음 호출, dataframe != getAllCovidAffectedNumbers()
            print("it worked!")
            return dataframe.coalesce(1).write.format("csv").option("header","true").mode("overwrite").save("C:/covid19_result/all_day/")#count때 호출된 dataframe이 cache된다.
        elif dataframe.count() == getTodayCovidAffectedNumbers().count():
            print("it worked!")
            return dataframe.coalesce(1).write.format("csv").option("header","true").mode("overwrite").save("C:/covid19_result/today/")

#파티션별로 나누어 파일 저장(daily partition별로 데이터 적재) 
def saveDataAsPartitionCSV(dataframe):
    if type(dataframe) == tuple:
        dataframe[0].cache()#cache의 수명은 함수가 호출되서 끝나기까지 이므로 매 함수마다 적용
        start_date=dataframe[1]
        end_date=dataframe[2]
        if dataframe[0].count() == getPeriodCovidAffectedNumbers(start_date, end_date)[0].count():
            print("it worked!")
            return dataframe[0].write.format("csv").option("header","true").mode("overwrite").partitionBy("기준날짜").save("C:/covid19_result/partition/period/")
    else:
        dataframe.cache()
        if dataframe.count() == getAllCovidAffectedNumbers().count():
            print("it worked!")
            return dataframe.write.format("csv").option("header","true").mode("overwrite").partitionBy("기준날짜").save("C:/covid19_result/partition/all_day/")
        elif dataframe.count() == getTodayCovidAffectedNumbers().count():
            print("it worked!")
            return dataframe.write.format("csv").option("header","true").mode("overwrite").partitionBy("기준날짜").save("C:/covid19_result/partition/today/")

#mysql DB에 Covid19 DataFrame의 데이터를 저장
def saveDataToMySQL(dataframe):
    if type(dataframe) == tuple:
        dataframe[0].cache()#cache의 수명은 함수가 호출되서 끝나기까지 이므로 매 함수마다 적용
    else:
        dataframe.cache()
        
    if dataframe.count() == getAllCovidAffectedNumbers().count():
        print("insert to mysql Covid19 table")
        return dataframe.coalesce(1).write.format("jdbc").options(
            url='jdbc:mysql://localhost:3306/COVID19',
            driver='com.mysql.cj.jdbc.Driver',
            dbtable='Covid_19_info',
            user='root',
            password='root'
        ).mode('overwrite').save()

5. 메인함수(daily batch) 구현

이후 EC2서버에서 동일하게 구현했을 때, 매일매일 데이터를 받아와 적재하기 위해서 메인함수를 구현합니다.

  1. 오늘의 확진자수를 어제와 비교하여 오늘 확진자 수를 출력하고, 어제와 비교하여 얼마나 증감이 있었는지를 출력
  2. 오늘의 데이터가 아직 입력되지 않은 경우, 안내문구 출력
  3. 이후에 로컬에 CSV파일로 저장하는 함수, 로컬에 partitionBy로 저장하는 함수, MySQL에 데이터를 입력하는 함수를 실행시키고 끝낸다.
if __name__=="__main__":
    try:
        today_infection_num = (getTodayCovidAffectedNumbers().first()["당일확진자수"])
        infection_num_diff = (getTodayCovidAffectedNumbers().first()["당일확진자수"] - 
                                                     getPeriodCovidAffectedNumbers(today-oneday,today)[0].where(F.col("기준날짜")==str(today-oneday)).first()["당일확진자수"])
        print("오늘(%s)의 확진자수는 %d명입니다.\n" % (today, today_infection_num))#df.first()['column name'] 혹은 df.collect()[0]['column name'], 오늘의 데이터가 없을 경우 none type이 되어 에러를 낸다.try except 처리
        if infection_num_diff >= 0:
            print("어제보다 코로나 확진자가 %d명 늘었습니다.\n" % (infection_num_diff))
        else:
            print("어제보다 코로나 확진자가 %d명 줄었습니다.\n" % (-infection_num_diff))
    except TypeError:
        print("오늘의 데이터가 아직 입력되지 않았습니다.")
        
    saveDataAsCSV(getAllCovidAffectedNumbers())
    saveDataAsPartitionCSV(getAllCovidAffectedNumbers())
    saveDataToMySQL(getAllCovidAffectedNumbers())

전체 소스코드 파일

api_ETL_pypsark.py
0.01MB

다음 포스트에는 EC2서버를 띄워 로컬과 동일한 환경을 구성하고

pyspark설치 및 airflow설치를 진행해보겠습니다.

728x90

댓글