BigData/Apache Airflow

[BigData] Apache Airflow 설치 및 실습 하기 series (1) Airflow란? DAG란?

스파이디웹 2021. 10. 17. 21:06
728x90

안녕하세요 이번 포스트에는 Airflow의 개념에 대해서 알아보고 어떤 역할을 하는지, 그리고 핵심인 DAG파일에 대해서 알아보겠습니다.


1. Apahce Airflow란?

  • 에어비앤비에서 python기반으로 개발한 워크플로우 스케줄링, 모니터링 플랫폼
  • workflow management tool
  • oozie와 luigi와 같이 데이터파이프라인을 구성할 때 사용된다.

2. airflow 의 특징 및 장점

  • airflow 파이프라인(동작순서, 방식)을 파이썬 코드를 이용하여 DAG를 구성하기 때문에 동적인 구성이 가능
  • oozie와 같은 ui로 구성하는 것에 비해 비교적 큰 파이프라인을 코드로써 편하게 구성할 수 있다.
  • airflow webserver가 있어 웹 UI를 표현하고, workflow 상태를 표시,실행, 재시작, 수동 조작, 로그확인 할 수 있다.
  • 간결하고 명시적이며, 진자 템플릿(jinja template)을 이용하여 파라미터화 된 데이터를 전달하고 자동으로 파이프라인을 생성 가능
  • 분산구조와 메시지큐를 이용하여 많은수의 워커간의 협업을 지원, 스케일 아웃이 가능
  • tree, graph, gantt chart 등 다양한 형태로 모니터링 가능

3. airflow가 사용되는 곳, 사용하는 이유

  • 데이터파이프라인에서는 ETL(Extract, Transform, Load)속 수 많은 로직중에 선행 되어야 할, 후행 되어야 할 작업간의 의존성과 같은 것을 한눈에 알아 볼 수 있게끔 코드로써 구성하고, UI로 관리/모니터링할 수 있어서 사용
  • 적은 수의 의존관계나 스케줄링작업은 cron으로 해결하는 케이스가 있지만, 관리할 로직이 늘어 났을 때의 복잡함을 한눈에 알아볼 수 있어서 사용
  • batch 작업을 스케줄링하여 돌릴 때 사용

4. Airflow 구성

출처 : https://airflow.apache.org/docs/apache-airflow/stable/concepts.html

Webserver

  • Web UI 제공
  • Workflow의 전반적인 조작 및 기록 관리

Scheduler

  • DAG 가 저장된 디렉토리(DAG Bag)를 주기적으로 스캔하여 DAG를 최신화함
  • 스케쥴에 따라서 DAG 에 정의된 Task 들을 순차적으로 실행
  • Task 를 실행할 Worker를 지정하여 작업을 명령

MetaDB

  • DAG 정보, 스케쥴링 정보, 작업 실행 결과 등의 메타데이터를 저장
  • SQLite(SQLite는 다중 접근이 안되기 때문에 분산 처리 불가), PostgreSQL, MySQL 등의 DB 를 지원

Worker

  • Airflow의 Task 가 실행되는 환경
  • Executor 설정에 의하여 Local 에서 실행되거나, 별도의 노드 또는 컨테이너에서 실행됨

DAGs

  • Directed Acyclic Graph, Task 간의 관계와 순서(dependency)를 정의, Task 는 Operator 로 구현됨

5. Airflow의 Executor

 

Sequential Executor :

  • Worker 가 한번에 하나의 Task만 수행할수 있어 제약적임

Local Executor :

  • Worker 가 Scheduler 와 같은 서버에서 실행됨

Celery Executor :

  • Worker 노드를 별도 구성 → worker 수 scale out 가능
  • Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식

Kubernetes Executor :

  • 컨테이너 환경에서 Worker 가 실행되도록 구성
  • Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow 워커를 pod 형태로 실행
  • 매 Task마다 pod가 생성되므로 가볍고, Worker에 대한 유지 보수가 필요없다는 장점
  • Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용
  • 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점

출처 : https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html

 


6. Airflow의 DAG

1) default_args

default_args = {
'owner': 'ownerid',
'retries': 0,
'retry_delay': timedelta(seconds=20),
'depends_on_past': False
}

 

2) DAG

dag_name = DAG(
    'dag_name',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='@once',
    catchup=False,
    is_paused_upon_creation=False,
)

#start_date=days_ago(2)에서 2는 샘플이며, 0,1,2,..10... 실행가능하다.

속성 해석
dag_name  dag이름
default_args  default_args에서 설정한 값
start_date   dag 내에서의 start_date는 dag가 scheduling되는 시각.
현재 날짜 이전으로 설정하면 생성 즉시 scheduling으로 되며,
현재 날짜,시각 이후로 설정하면 해당 날짜,시각에 scheduling된다.
schedule_interval  스케줄링 간격 @once는 한번만 실행, timedelta(days=1)는 하루마다 실행
"0/10 * * * *" 과 같이 cron expression으로 10분 간격으로 실행을 설정할 수 있다.
execution_date airflow 2.2v 기준 data interval의 start지점,
제출하고 싶은 시점의 데이터의 시작지점
ex)
2021-12-13 00:00 ~ 2021-12-13 23:59의 데이터를 job 돌리고싶을 때 execution_date는 2021-12-13 00:00
catchup  Scheduler는 DAG의 전체 lifetime을 확인하면서 실행되지 않은 DAG를 실행시, 이렇게 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는 것 true일 경우 과거에 실행되지 못한 DAG실행시킴.
is_paused_upon_creation  DAG가 최초 생성되었을 때 멈춘다. dag가 이미 존재한다면, 무시된다. default값은 true

3) 에어플로우의 시간(스케줄링)

start_date: 

  • Airflow에서 start_date는 실행 시작 날짜가 아니라 스케줄이 시작 되는 날짜

execution_date

  • 나중에 해당 DAG를 다시 실행해도 그대로 해당 값으로 유지되는 DAG고유 실행 Id 같은 값
  • 1월 1일이 execution_date 인 경우 ETL관점에서 증분을 생각하면 이해하기 쉽습니다. execution_date가 1월 1일인 task의 경우 1월1일의 data를 가지고 ETL을 한다고 예상가능하며, 1월1일 데이터는 1월2일이 되어야 모두 존재하기 때문에 Airflow는 이런 방식의 시간을 설정

schedule_interval

  • 스케쥴 되는 간격
  • cron expression으로도 사용가능하며(@once,@daily도 가능)

preset meaning cron

None Don’t schedule, use for exclusively “externally triggered” DAGs  
@once Schedule once and only once  
@hourly Run once an hour at the beginning of the hour 0 * * * *
@daily Run once a day at midnight 0 0 * * *
@weekly Run once a week at midnight on Sunday morning 0 0 * * 0
@monthly Run once a month at midnight of the first day of the month 0 0 1 * *
@quarterly Run once a quarter at midnight on the first day 0 0 1 */3 *
@yearly Run once a year at midnight of January 1 0 0 1 1 *

https://www.bucketplace.co.kr/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/

data interval:

  • airflow 2.2v 부터는 혼란스러운 execution을 대체할 개념인 data interval이 나옴
  • data interval의 start시간이 execution_date
  • data interval의 end시간을 실제 실행이 시작되는 시간
  • data interval의 end시간 - start시간 = schedule_interval

이해가 잘안가시는 분은 아래의 링크를 참조해 주세요.

https://spidyweb.tistory.com/309

 

[BigData] Apache Airflow 설치 및 실습하기 series (3) Airflow로 spark-submit(pyspark)하기with BashOperator

이번 포스트에는 spark-submit실습한 파일을 가지고 설치한 airflow로 spark-submit해보도록 하겠습니다. spark-submit을 통해 만든 .py file과 실습내용은 아래의 링크에 있습니다. https://spidyweb.tistory.com..

spidyweb.tistory.com

4) Task

task_start = DummyOperator(
task_id='start',
dag=dag_runAirflow_v1
)

 

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)


5) 의존성 구성

task_start >> [task_af_01, task_af_02, task_af03] >> task_next1

task_next1 >> [task_af_04, task_af_05, task_af_06] >> task_next2

task_next2 >> [task_af_07, task_af_08, task_af_09] >>task_finish 


아직 내용이 많이 부족하며 차차 수정해 나가겠습니다.

*2021-11-19 수정

*2021-12-14 수정

 

 

참조:

https://www.comtec.kr/2021/08/09/airflow-tutorial/

https://www.bucketplace.co.kr/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#tasks

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html

https://www.bucketplace.co.kr/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/

https://blog.bsk.im/2021/03/21/apache-airflow-aip-39/

728x90