본문 바로가기
BigData/Apache Airflow

[Airflow] Airflow로 ETL 파이프라인 V2 만들기 (Variables + TaskGroup을 활용한 Task Factory 구현)

by 스파이디웹 2024. 2. 9.
728x90

이번 포스트에는 저번에 작성했었던 각 작업들을 통합하여 만든 DAG인 ETL 파이프라인 구축하기 V1에서 중복된 태스크는 줄이고, 병렬실행을 도입시켰으며, task를 정의한 Variables 변수에 따라 Dynamic하게 갯수와 이름이 변하게 되게 끔 구성하였습니다.

저번에 작성한 포스트를 보고 싶으시다면 아래의 링크를 확인해주세요.

https://spidyweb.tistory.com/506

 

[Airflow] Airflow로 ETL 파이프라인 만들기(python, EMR, glue crawler, Email, Slack, DB반영)

이번 포스트에는 회사에서 수동으로 처리하고 있던 일회성 ETL작업을 Airflow DAG으로 묶어서 파이프라인을 만든 경험에대해서 소개해드리겠습니다. 기존 업무 처리방식과 Airflow DAG의 필요성 ETL 방

spidyweb.tistory.com

 


1. 문제점과 바뀐 점

1-1) 비효율적인 task 구성 → 다수의 ETL 하나의 DAG로 병렬처리

현재 쓰이고 있던 ETL DAG은 1번의 실행당 1번의 ETL밖에 처리하지 못하는 단점이 있었습니다.

즉, 3개의 각기 다르거나 같은 DB내에서의 ETL 요청이 들어온다면 총 3번의 실행을 작업이 끝나기를 기다렸다가 재실행 시켜줘야 하는 흔히말하는 사람이 신경을 써줘야하는 프로세스상의 번거로움이 있었습니다.

 

따라서, 여러개의 ETL작업을 하나의 DAG로 끝내면 편해질 것이라는 생각과 그 ETL작업도 순차처리가 아닌, 비슷한 작업들은(TSV 생산, Parquet 변환 등) 하나의 TaskGroup으로 묶어 병렬처리를 하면 효율적이겠다는 생각이 들었습니다.

 

1-2) EMR on EC2 → EMR Serverless

ETL 요청은 다양한 종류와 크기의 DB로부터 요청이 들어오게 되는데, 그에 따라서 적당한 EMR의 스펙을 매번 정하는 것은 굉장히 번거롭고 어려운 일입니다.

저희 회사는 얼마 전부터 어느 정도 작은 workload에 한에서는 EMR Serverless를 적극 도입해서 사용하고 있는데요.

이전에 EMR on EC2를 사용할 때는, 적당한 크기의 클러스터에서(한정된 자원에서) 여러 ETL작업들을 병렬처리 하는 것에 대해서 작업에 대한 지연이 있을까봐 병렬처리가 아닌 순차 처리로 구성할 수 밖에 없었는데요.

EMR Serverless는 탄력적인 자원의 활용과 여러 App을 실행시키더라도, 각각 자원의 간섭없이 병렬적으로 처리될 수 있다는 점이 이번 개선사항에서는 필수적이였습니다.

 

1-3) 중복된 glue crawler 실행 → Set을 사용하여 중복된 작업이 있을 시, 중복 제거하여 정확히 한 번 실행

우리 회사의 경우 glue crawler는 S3 디렉토리에 정의된 각 DB이름과 경로 별로 지정되어 있어서, 동일한 DB에 대해서 요청이 여러 개 들어온 경우 glue crawler를 요청받은 ETL 수 만큼 실행시키고 있기 때문에 시간적으로도 비용적으로도 매번 낭비였어서 여러 개의 요청이 들어온 경우 정확히 한 번만 수행시키면 좋겠다고 늘 생각하고 있었습니다.

 

따라서 요청받은 각 ETL 데이터 셋별로 취합하여 python set으로 만들어 중복을 제거 한 후 정확히 한 번만 병렬로 실행되게끔 Task 구성을 변경하였습니다.

 

ex) ETL요청에 따라 MariaDB1_crawler, MariaDB1_crawler, OracleDB1_crawler 3개를 돌려야 하는 상황이라면

{MariaDB1_crawler,OracleDB1_crawler} 처럼 유일한 crawler 리스트로 만드는 작업을 추가시켰습니다.

 


2. ETL 파이프라인 V2 구성

일단 전체적인 흐름과 파이프라인을 소개시켜드리면, 최초에 저희 ETL관리 페이지로부터 Airflow Variables값을 넘겨받고(수정하고) 이후에 바로 ETL파이프라인V2가 실행되게끔 하는 bash command를 날리게 됩니다.

 

airflow variables 형태는 리스트 속에 ETL 요청 수 만큼 dictionary 데이터 셋을 받을 수 있게 구성했습니다.

[{"task_id":"task_id_value1",...},{"task_id":"task_id_value2",...},{"task_id":"task_id_value3",...}]

전체적인 ETL 파이프라인 흐름
TaskGroup에 정의된 Dynamic Task

2-1) 핵심 아이디어

전체적인 개념은 이전의 포스트와 같습니다. 단지 핵심 개념은

  1. DAG에서 여러개를 받는 Dynamic한 TaskGroup(어떻게 보면 Task Factory 개념?)
  2. 병렬처리가 가능한 부분은 병렬로 처리
  3. Glue Crawler는 정확히 한번만 처리
  4. Slack 상세정보 각 Task별로 계산 및 적용
  5. 각 ETL 요청자에게 수행 완료 메일 전송

DAG에서 여러개를 받는 Dynamic한 TaskGroup

from airflow.utils.task_group import TaskGroup

# TaskGroup 정의하는 법 1, TaskGroup 내부에 각 task들을 직접 정의한다.
with TaskGroup(group_id='TSV') as TSV:
    python_task_1 = PythonOperator(~~)
    python_task_2 = PythonOperator(~~)
    SSH_task_1 = SSHOperator(~~)
    SSH_task_2 = SSHOperator(~~)
    
    # 정의된 각 task들을 순차 혹은 병렬로 구성한다.
    python_task_1 >> python_task_2 >> SSH_task_1 >> SSH_task_2
    
    # 병렬로 구성한다. 리스트안에 task들을 나열하면 병렬로 구성된다.
    [python_task_1, python_task_2, SSH_task_1, SSH_task_2]
    
    # chain을 통해 순차적으로 구성from airflow.models.baseoperator import chain을 불러와야 함
    chain(python_task_1, python_task_2, SSH_task_1, SSH_task_2)

# TaskGroup 정의하는 법 2, TaskGroup 내부에 하나의 Task를 Iterate돌려 List로 만든다(List Comprehension)
with TaskGroup(group_id='TSV') as TSV:
	# task_id가 Variable에 의해 dynamic하게 정의된다.
    TSV_tasks = [SSHOperator(task_id=f"{i['task_id'}",ssh_conn_id="ec2연결",command=f"python3 /~" for i in variables]
    
    # 정의된 task들을 그대로 사용하면 병렬로 처리된다. [task_id_1, task_id2, task_id3] 와 같은 개념
    TSV_tasks
    
    # 위와 동일하게 병렬로 처리하는 것을 명시적으로 보여주는 방법
    [i for i in TSV_tasks]
    
    # 정의된 task들을(Task Lists) 순차적으로 실행하는 방법
    chain(*TSV_tasks) # TaskGroup밑에 정의된 태스크들을 어떤 형태로 실행시킬 지 정의해줘야 함

TaskGroup 정의하는 법 2에 Dynamic하게 Task factory처럼 Task들을 만들어내는 방법이 소개되어 있는데,

task_id는 Variables를 통해 Dynamic하게 만들고, TSV_tasks라는변수 하나에 List comprehension을 통해 Iterate하게 Task들을 만들어 내고 있습니다.

 


병렬처리가 가능한 부분은 병렬로 처리

자원의 한정이 있는 EC2 서버작업을 제외하고 나머지 glue crawler, EMR Serverless, Slack에 알림보내기, Email같은 경우는 병렬로 처리해도 상관 없기 때문에 TaskGroup을 통해 병렬로 실행하는 것으로 변경하였습니다.

with TaskGroup(group_id='Parquet') as Parquet:
	# task_id가 Variable에 의해 dynamic하게 정의된다.
    Parquet = [PythonOperator(task_id=f"{i['task_id'}" for i in variables]
    
    Parquet
    
with TaskGroup(group_id='Crawler') as Crawler:
	# task_id가 Variable에 의해 dynamic하게 정의된다.
    Crawler = [Glue~~(task_id=f"{i['task_id'}" for i in variables]
    
    Crawler
    
with TaskGroup(group_id='Slack') as Slack:
	# task_id가 Variable에 의해 dynamic하게 정의된다.
    Slack = [PythonOperator~~(task_id=f"{i['task_id'}" for i in variables]
    
    Slack
    
with TaskGroup(group_id='Email') as Email:
	# task_id가 Variable에 의해 dynamic하게 정의된다.
    Email = [PythonOperator~~(task_id=f"{i['task_id'}" for i in variables]
    
    Email
    
Parquet >> Crawler >> Slack >> Email

 


2-2) TSV 처리

TSV의 경우 EC2서버에서 Python file로 처리하고 있기 때문에, 한정된 자원이라는 점을 고려하여 Sequential 하게 구성하였습니다.

# 예시로 아래와 같은 코드를 구성했습니다.
# chain(*task_list)의 형태는 리스트안에 여러 태스크를 정의한 것을 순차적으로 처리한다는 뜻
# 아무런 것도 정의하지 않는 경우 TSV_tasks는 병렬로 실행 됨
with TaskGroup(group_id='TSV') as TSV:
    TSV_tasks = [SSHOperator(task_id=f"{i['task_id'}",ssh_conn_id="ec2연결",command=f"python3 /~" for i in variables]
    chain(*TSV_tasks) # TaskGroup밑에 정의된 태스크들을 어떤 형태로 실행시킬 지 정의해줘야 함

2-3) Parquet처리

Parquet의 경우 EMR Serverless를 통해 처리하고 있기 때문에, 여러개의 ETL작업을 병렬로 처리해도 상관 없기 때문에 병렬 처리로 구성하였습니다.

# 아무런 것도 정의하지 않는 경우 Parquet_tasks 병렬로 실행 됨
with TaskGroup(group_id='Parquet') as Parquet:
    Parquet_tasks = [PythonOperator(task_id=f"{i['task_id'}" /~" for i in variables]
    
    # 병렬로 처리하는 것을 명시적으로 보여주는 방법 list comprehension으로 병렬처리를 구현
    [i for i in Parquet_tasks]

2-4) Crawler 처리

Crawler는 정확히 한번만 중복을 제거해서 실행시키는 로직이 포함되어야 하므로 Set로 데이터를 받아온 후, list로 변환하였습니다.

with TaskGroup(group_id='Crawler') as Crawler:
	crawler_sets = set([])
    for i in Variables:
    	crawler_sets.add(i['crawler_name'])
        
    Crawler_tasks = [AWSGlueCrawlerOperator(task_id=i,aws_conn_id='aws',config(name=~) for i in list(crawler_sets)]
    [i for i in crawler_tasks]

2-5) Slack에 알람

슬랙에 각 task 별 정보를 각각 계산하는게 조금 시간이 간 작업인데, 아래와 같이 TaskInstance를 통해 구현하였습니다.

물론 for문, list에서 필요한 정보만 변수로 저장하기, 날짜 formatting 와 같은 로직이 추가로 필요하지만

핵심은 아래 TaskInstance를 통해 start_date, end_date, duration등을 가져오는 것입니다.

from airflow.models import TaskInstance

dag_instance = kwargs["dag_run"]
task_instances = dag_instance.get_task_instances()

start_date = task_instances.start_date
end_date = task_instances.end_date
duration = task_instances.duration

2-6) Email 결과 전송

email 또한 다를게 없습니다.. 각 Variable 리스트안에 dictionary로 정의된 데이터셋에 ETL별 수신자의 email주소로 보내게끔 구현했습니다.

# 아무런 것도 정의하지 않는 경우 Parquet_tasks 병렬로 실행 됨
with TaskGroup(group_id='Email') as Email:
    Email_tasks = [PythonOperator(task_id=f"{i['task_id'}" /~" for i in variables]
    
    # 병렬로 처리하는 것을 명시적으로 보여주는 방법 list comprehension으로 병렬처리를 구현
    [i for i in Email_tasks]

 

728x90

댓글