안녕하세요 이번 포스트에는 Airflow의 개념에 대해서 알아보고 어떤 역할을 하는지, 그리고 핵심인 DAG파일에 대해서 알아보겠습니다.
1. Apahce Airflow란?
- 에어비앤비에서 python기반으로 개발한 워크플로우 스케줄링, 모니터링 플랫폼
- workflow management tool
- oozie와 luigi와 같이 데이터파이프라인을 구성할 때 사용된다.
2. airflow 의 특징 및 장점
- airflow 파이프라인(동작순서, 방식)을 파이썬 코드를 이용하여 DAG를 구성하기 때문에 동적인 구성이 가능
- oozie와 같은 ui로 구성하는 것에 비해 비교적 큰 파이프라인을 코드로써 편하게 구성할 수 있다.
- airflow webserver가 있어 웹 UI를 표현하고, workflow 상태를 표시,실행, 재시작, 수동 조작, 로그확인 할 수 있다.
- 간결하고 명시적이며, 진자 템플릿(jinja template)을 이용하여 파라미터화 된 데이터를 전달하고 자동으로 파이프라인을 생성 가능
- 분산구조와 메시지큐를 이용하여 많은수의 워커간의 협업을 지원, 스케일 아웃이 가능
- tree, graph, gantt chart 등 다양한 형태로 모니터링 가능
3. airflow가 사용되는 곳, 사용하는 이유
- 데이터파이프라인에서는 ETL(Extract, Transform, Load)속 수 많은 로직중에 선행 되어야 할, 후행 되어야 할 작업간의 의존성과 같은 것을 한눈에 알아 볼 수 있게끔 코드로써 구성하고, UI로 관리/모니터링할 수 있어서 사용
- 적은 수의 의존관계나 스케줄링작업은 cron으로 해결하는 케이스가 있지만, 관리할 로직이 늘어 났을 때의 복잡함을 한눈에 알아볼 수 있어서 사용
- batch 작업을 스케줄링하여 돌릴 때 사용
4. Airflow 구성
Webserver
- Web UI 제공
- Workflow의 전반적인 조작 및 기록 관리
Scheduler
- DAG 가 저장된 디렉토리(DAG Bag)를 주기적으로 스캔하여 DAG를 최신화함
- 스케쥴에 따라서 DAG 에 정의된 Task 들을 순차적으로 실행
- Task 를 실행할 Worker를 지정하여 작업을 명령
MetaDB
- DAG 정보, 스케쥴링 정보, 작업 실행 결과 등의 메타데이터를 저장
- SQLite(SQLite는 다중 접근이 안되기 때문에 분산 처리 불가), PostgreSQL, MySQL 등의 DB 를 지원
Worker
- Airflow의 Task 가 실행되는 환경
- Executor 설정에 의하여 Local 에서 실행되거나, 별도의 노드 또는 컨테이너에서 실행됨
DAGs
- Directed Acyclic Graph, Task 간의 관계와 순서(dependency)를 정의, Task 는 Operator 로 구현됨
5. Airflow의 Executor
Sequential Executor :
- Worker 가 한번에 하나의 Task만 수행할수 있어 제약적임
Local Executor :
- Worker 가 Scheduler 와 같은 서버에서 실행됨
Celery Executor :
- Worker 노드를 별도 구성 → worker 수 scale out 가능
- Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식
Kubernetes Executor :
- 컨테이너 환경에서 Worker 가 실행되도록 구성
- Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow 워커를 pod 형태로 실행
- 매 Task마다 pod가 생성되므로 가볍고, Worker에 대한 유지 보수가 필요없다는 장점
- Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용
- 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점
6. Airflow의 DAG
1) default_args
default_args = {
'owner': 'ownerid',
'retries': 0,
'retry_delay': timedelta(seconds=20),
'depends_on_past': False
}
2) DAG
dag_name = DAG(
'dag_name',
start_date=days_ago(2),
default_args=default_args,
schedule_interval='@once',
catchup=False,
is_paused_upon_creation=False,
)
#start_date=days_ago(2)에서 2는 샘플이며, 0,1,2,..10... 실행가능하다.
속성 | 해석 |
dag_name | dag이름 |
default_args | default_args에서 설정한 값 |
start_date | dag 내에서의 start_date는 dag가 scheduling되는 시각. 현재 날짜 이전으로 설정하면 생성 즉시 scheduling으로 되며, 현재 날짜,시각 이후로 설정하면 해당 날짜,시각에 scheduling된다. |
schedule_interval | 스케줄링 간격 @once는 한번만 실행, timedelta(days=1)는 하루마다 실행 "0/10 * * * *" 과 같이 cron expression으로 10분 간격으로 실행을 설정할 수 있다. |
execution_date | airflow 2.2v 기준 data interval의 start지점, 제출하고 싶은 시점의 데이터의 시작지점 ex) 2021-12-13 00:00 ~ 2021-12-13 23:59의 데이터를 job 돌리고싶을 때 execution_date는 2021-12-13 00:00 |
catchup | Scheduler는 DAG의 전체 lifetime을 확인하면서 실행되지 않은 DAG를 실행시, 이렇게 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는 것 true일 경우 과거에 실행되지 못한 DAG실행시킴. |
is_paused_upon_creation | DAG가 최초 생성되었을 때 멈춘다. dag가 이미 존재한다면, 무시된다. default값은 true |
3) 에어플로우의 시간(스케줄링)
start_date:
- Airflow에서 start_date는 실행 시작 날짜가 아니라 스케줄이 시작 되는 날짜
execution_date:
- 나중에 해당 DAG를 다시 실행해도 그대로 해당 값으로 유지되는 DAG고유 실행 Id 같은 값
- 1월 1일이 execution_date 인 경우 ETL관점에서 증분을 생각하면 이해하기 쉽습니다. execution_date가 1월 1일인 task의 경우 1월1일의 data를 가지고 ETL을 한다고 예상가능하며, 1월1일 데이터는 1월2일이 되어야 모두 존재하기 때문에 Airflow는 이런 방식의 시간을 설정
schedule_interval:
- 스케쥴 되는 간격
- cron expression으로도 사용가능하며(@once,@daily도 가능)
preset meaning cron
None | Don’t schedule, use for exclusively “externally triggered” DAGs | |
@once | Schedule once and only once | |
@hourly | Run once an hour at the beginning of the hour | 0 * * * * |
@daily | Run once a day at midnight | 0 0 * * * |
@weekly | Run once a week at midnight on Sunday morning | 0 0 * * 0 |
@monthly | Run once a month at midnight of the first day of the month | 0 0 1 * * |
@quarterly | Run once a quarter at midnight on the first day | 0 0 1 */3 * |
@yearly | Run once a year at midnight of January 1 | 0 0 1 1 * |
data interval:
- airflow 2.2v 부터는 혼란스러운 execution을 대체할 개념인 data interval이 나옴
- data interval의 start시간이 execution_date
- data interval의 end시간을 실제 실행이 시작되는 시간
- data interval의 end시간 - start시간 = schedule_interval
이해가 잘안가시는 분은 아래의 링크를 참조해 주세요.
https://spidyweb.tistory.com/309
4) Task
task_start = DummyOperator(
task_id='start',
dag=dag_runAirflow_v1
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
5) 의존성 구성
task_start >> [task_af_01, task_af_02, task_af03] >> task_next1
task_next1 >> [task_af_04, task_af_05, task_af_06] >> task_next2
task_next2 >> [task_af_07, task_af_08, task_af_09] >>task_finish
아직 내용이 많이 부족하며 차차 수정해 나가겠습니다.
*2021-11-19 수정
*2021-12-14 수정
참조:
https://www.comtec.kr/2021/08/09/airflow-tutorial/
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#tasks
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html
댓글