이번 포스트에는 Airflow 3.0에서 변경 되거나 도입된 다양한 Feature에 대해서 정리해보고, 이전에 docker compose를 통해 설치해본 Airflow 3.0 UI를 직접 확인해보면서 정리해보겠습니다.
Airflow 3.0 설치는 아래 링크를 참조해주세요.
https://spidyweb.tistory.com/591
[Airflow] Airflow 3.0 version 탐구 1) docker로 설치 해보기
이번에 Airflow 3.0 version이 release됐는데, 관련해서 docker로 한번 설치해보고, 변경 사항들을 직접 확인해보겠습니다. 1. AWS에 EC2 생성서버 스펙- 이미지: Amazon linux2- Instance type: c5n.large(2Vcores, 5.3GB Mem)
spidyweb.tistory.com
1. Airflow 3.0 변경점 및 도입된 점
3.0으로 업그레이드 되면서 많은 변경점이 있는데 간략하게 아래와 같은 키워드가 있습니다.
- Modernization: 성능과 보안을 향상시키기 위해 최신 기술과 Python 버전을 활용
- Simplification: 코드베이스와 사용자 경험을 간소화하기 위해 사용 중단된 기능 제거
- Compliance: 변화하는 규제 기준을 충족하기 위해 보안 기능 강화
- Community Feedback: 사용자 경험과 요청을 기반으로 한 변경사항 적용(필요 기능에 대해 설문을 계속 했었고, 반영한 것으로 보입니다. backfill, dag versioning 등등)
1) Airflow 3.0 핵심 변화 6가지
1-1. 클라이언트‑서버 아키텍처 전환
- 새로운 Task Execution Interface 도입으로 스케줄러·메타DB와 태스크 실행 환경을 분리
- Edge Executor와 함께 멀티‑클라우드·온프렘·엣지 어디서나 실행 가능
1-2. 다중 언어 Task SDK
- 태스크를 Python 외에 Go → Java → JavaScript 순으로 지원 예정
- DAG는 그대로 Python, 내부 태스크만 다른 언어로 작성 가능
- Airflow AI SDK, PydanticAI(https://github.com/astronomer/airflow-ai-sdk)
1-3. 모던 React UI + DAG 버전 관리
- Flask AppBuilder 제거, React + FastAPI 기반으로 실시간 새로고침·다크모드·플러그인 슬롯 제공. ‑ DAG Versioning·Bundle 다운로드 기능이 기본 탑재
- 2.x 에서는 Flask와 Jinja2 + jQuery/Bootstrap로 구성되어 있던 UI를 React와 FastAPI로 변경
- Dark Mode를 지원

| 버전 | UI 기술 스택 | 특징 |
| 1.x | Flask + Jinja2 + 기본 JS/jQuery | 매우 기본적인 UI |
| 2.x | Flask + Jinja2 + jQuery/Bootstrap | UI 개선, DAG 보기 향상 |
| 3.x (예정/알파) | React (프론트) + FastAPI (백엔드 API) | 모던 SPA 방식, 성능 및 UX 대폭 개선 |
1-4. Data Asset 모델 & 이벤트 기반 스케줄링
- 기존 Dataset을 확장해 @asset 데코레이터로 자산 간 의존성을 선언
- 외부 메시지 버스(SQS 등) 이벤트로 즉시 트리거 가능
- Real-time event-driven data pipelines 구축이 가능
- workflow는 cron-based 고정된 스케줄링으로 작업을 기다리기 보다, 새로운 데이터가 도착하자마자 실행이 가능하게끔 변경
- 3.0 이전에는 2가지 방법이 있었음
- You wait for the event with a sensor
- You have an AWS Lambda using the Airflow API to trigger your pipeline - 하지만 아래와 같은 문제점이 존재
❌ Latency
❌ Waste of resources (sensors waiting)
❌ Rely on a third-party service
❌ Non-native mechanism - AssetWatcher를 통해 외부 자원을 모니터링
- 큐(현재는 SQS만 지원됨)를 참조하는 Trigger를 생성
- 위에서 만든 Trigger를 감시하는 Asset을 정의
- 해당 Asset을 기준으로 파이프라인을 스케줄링
- 아래 코드에 나온 것처럼 이벤트를 읽어옴

1-5. Scheduler‑Managed Backfill & 대규모 재실행 & 하나의 DAG에 여러 스케줄 지정가능
- 백필 작업을 스케줄러가 자동 분산·모니터링하여 ML 재학습·대량 ETL에 최적화
- backfill을 기존에 CLI를 통해 사용했어야 했다면, 3.0에서는 UI를 통해 쉽게 사용 가능
- MultipleCronTriggerTimetable를 통해 하나의 DAG에 여러 스케줄링을 설정 가능
from airflow.timetables.trigger import MultipleCronTriggerTimetable
# At 1:10 and 2:40 each day.
@dag(schedule=MultipleCronTriggerTimetable("10 1 * * *", "40 2 * * *", timezone="UTC"), ...)
def example_dag():
pass
backfill 수행 with UI

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html
Timetables — Airflow Documentation
airflow.apache.org
1-6. DAG Versioning & Dag Bundle
- DAG에 대한 버전을 지원
- 버전을 통해 DAG의 변경 내용들이 Metadata에 저장되어 트래킹
- rollback하기가 쉬워짐


- DAG Bundle: DAG 정의, 종속성 코드, 설정, 리소스를 하나의 단위로 패키징해서 Airflow에 배포할 수 있는 새로운 패키징 포맷
bundle Yaml 예시
dag_bundle:
name: "my_cool_dag"
version: "1.0.0"
entrypoint: "dags/my_dag.py"
requirements: "requirements.txt"
| 기능 | 설명 |
| 배포 단위화 | DAG 단위로 격리된 번들 생성 가능 |
| 의존성 포함 가능 | requirements.txt 포함 가능 |
| 로컬 테스트 용이 | 번들 단위로 실행 가능 (airflow dags test-bundle) |
| 버전 명시 가능 | 각 번들마다 명확한 버전 명시 |
| 트래킹/롤백 가능성 증가 | 번들 변경 이력 추적이 쉬움 |
| 배포 툴과 통합 용이 | GitOps, CI/CD 환경에 적합 |
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-bundles.html
1-7. 레거시 정리와 보안 강화
- Python 3.9 이상 필수, 2.x에서 deprecated된 코드 전면 삭제.
- Task Isolation으로 워커가 메타DB에 직접 접근하지 않아 다중 테넌시 감사성 상승
| 구분 | AS-IS(Airflow 2.x) | TO-BE(Airflow 3.x) | 변경 이유 | 기대 효과 및 대비사항 |
| Python Version | Python 3.7과 그 이상의 버전을 지원 |
|
|
|
| deprecated 코드 삭제 | deprecated 된 코드 잔존 | airflow 2.x의 모든 deprecated 된 코드를 삭제 |
|
|
| 보안 및 규정 준수 기능 강화 |
|
|
|
|
2) 자잘한 변경점들
2-1) 성능 개선
- 스케줄러 최적화: 더 빠른 태스크 실행을 위한 스케줄러 성능 개선
- 비동기 처리 도입: 더 많은 작업을 동시에 처리할 수 있도록 비동기 프로그래밍 적용
- 리소스 관리 개선: 확장성을 고려한 리소스 처리 능력 향상
2-2) 사라진 기능들
- execution_date: execution_date가 사라지고 대신 dag_run.logical_date를 이용하도록 권고
- SugDagOperator / SubDAGs: SubDag 대신에 TaskGroups를 이용
- SLA callbacks / metrics: 추후에 Deadline Alerts 예정
- DAG / Xcom Pickling: 파이썬 객체를 바이트 스트림으로 변환하는 것을 Pickling, 반대는 Unpickling이라 하는데 이것도 더이상 지원하지 않음, DAG Pickling의 경우 JSON 직렬화를, Xcom의 경우 Custom Xcom backend를 이용
- DebugExecutor: DebugExecuor는 없어지고 대신 LocalExecutor를 이용
2. Airflow UI
1) Home
- Light / Dark Mode Toggle 새로 도입(User 메뉴에 존재)
- Home에서는 Airflow에서 실행되는 각 요소, MetaDatabase, Scheduler, Triggerer, Dag Processor에 대한 Health 체크가 한눈에 보이고 DAG의 상태 히스토리를 보여줌
- 기존 버전은 Menu가 상단에 있었다면, Airflow 3.0에서는 좌측으로 변경 되었음


2) Dags
리스트 형태와 아이콘 형태를 지원


특정 DAG를 수동 실행 시킬 때 Advanced Option을 통해 Run ID나 Notes, Configuration을 설정할 수 있다.

DAG는 바 형태와 그래프 형태, 2가지 형태로 확인 할 수 있으며, Task 실행 시는 아래와 같은 정보를 제공
- Logs
- Rendered Templates
- XCom
- Events
- Code
- Details
또한 DAG Version을 우측 상단에서 확인 할 수 있음

Task의 Details에서는 Dag의 Version ID 및 상세 정보를 확인 가능

3) Assets
- Assets은 Airflow2에 있던 Dataset보다 넓은 개념으로, Airflow의 관점에서 실체화된 어떤 자원(resource), Dataset도 일종의 Asset이지만, Asset은 더 포괄적
- Dataset은 Airflow에서 하나의 데이터 소스 또는 데이터 객체를 표현하는 개념(DAG 간 데이터 의존성을 위한 식별 가능한 데이터 단위 (파일, 테이블 등))
- "이 DAG은 특정 Dataset이 변경되었을 때 실행되어야 한다"와 같은 데이터 기반 트리거를 구현하는 데 사용
- 좋은 점은 어느 DAG, Task에서 사용 중인지 알 수 있음

클릭하면 다음과 같이 흐름도를 볼 수 있어 어느 DAG에서 사용 중인지 보기 쉽게 되어 있음

- Airflow 3.0에서 말하는 Producing Tasks와 Consuming DAGs는 데이터 중심 파이프라인(Data-aware scheduling) 기능에서 핵심적인 개념
- 이 기능은 DAG 간의 데이터 기반 의존성을 명확하게 표현하고 자동으로 DAG을 트리거할 수 있도록 도움
- Producing Tasks
- 특정 Dataset을 생성하거나 갱신하는 Task
- outlets=[Dataset()]을 통해 Dataset을 생성하는 Task
- Consuming DAGs
- 특정 Dataset이 업데이트되면 실행되는 DAG
- schedule=[Dataset()]을 통해 Dataset 변경에 반응하는 DAG
- 즉, 하나의 Task가 어떤 Dataset을 만들면, 그 Dataset을 사용해야 하는 다른 DAG이 자동으로 실행
Producing Tasks
from airflow import DAG
from airflow.operators.python import PythonOperator
def create_customer_data():
# 실제로는 S3에 파일 생성하는 로직
print("Customer data created")
with DAG(dag_id="produce_dag", schedule="@daily", start_date=datetime(2024, 1, 1)) as dag1:
task = PythonOperator(
task_id="produce_data",
python_callable=create_customer_data,
outlets=[customer_data], # 이 Task가 Dataset을 '생성함'
)
Consuming DAGs
from airflow import DAG
from airflow.operators.python import PythonOperator
def use_customer_data():
print("Customer data consumed")
with DAG(dag_id="consume_dag", schedule=[customer_data], start_date=datetime(2024, 1, 1)) as dag2:
task = PythonOperator(
task_id="consume_data",
python_callable=use_customer_data,
)
4) browse
Audit Logs를 통해 시스템 내에서 발생하는 중요한 이벤트들을 기록하여 누가 언제 어떤 작업을 했는지 추적할 수 있으며,
보안, 컴플라이언스, 문제 해결, 사용자 행동 추적 등의 목적으로 사용
또한 XComs(Cross Communications)을 통해 태스크 간에 데이터를 주고받은 정보를 확인 할 수 있음
- Audit Logs
- XComs

5) admin
Variables 및 Pool, Provider, Plugin, Connection을 관리할 수 있는 메뉴
- Variables
- Pools
- Providers
- Plugins
- Connections

6) Security
아직까지 3.0.1 버전에서도 아래 5개 목록을 클릭하면 이전의 버전 UI가 나오게 되고, Dark mode도 풀리게 됩니다.
- Users
- Roles
- Actions
- Resources
- Permissions

3. 간단한 DAG 생성 및 실습
만들어둔 ./dags 경로에 새로운 dag를 하나 생성합니다.
from airflow import DAG
from airflow.decorators import task
from airflow.timetables.trigger import MultipleCronTriggerTimetable
from datetime import datetime
with DAG(
dag_id="example_airflow_3_features_dag",
description="Test Airflow 3.0 features: versioning, managed backfill, scalable reruns, multiple cron",
start_date=datetime(2024, 6, 1),
catchup=True,
max_active_runs=3,
schedule=MultipleCronTriggerTimetable(
"0 0 * * *", # 매일 자정
"0 12 * * 1-5", # 평일 낮 12시
timezone="Asia/Seoul"
),
tags=["airflow_3.0", "versioning", "backfill", "cron"],
) as dag:
@task
def print_context(**kwargs):
run_id = kwargs.get("run_id")
data_interval_start = kwargs.get("data_interval_start")
data_interval_end = kwargs.get("data_interval_end")
logical_date = kwargs.get("logical_date") # execution_date의 대체
print(f"Run ID: {run_id}")
print(f"Data Interval Start: {data_interval_start}")
print(f"Data Interval End: {data_interval_end}")
print(f"Logical Date: {logical_date}") # 여기로 변경
""" 이후에 추가
@task
def add_task():
print("추가된 task")
"""
print_context() #>> add_task()
UI에서 확인 할 수 있는 생성된 DAG


backfill도 정상적으로 수행되는 것을 알 수 있고, Trigger버튼을 통해 수동으로도 수행시킬 수 도 있습니다.

위의 코드에서 task를 추가하거나 변경하게 되는 경우에 DAG version이 v2로 변경되는 것을 확인할 수 있음

'BigData > Apache Airflow' 카테고리의 다른 글
| [Airflow] Airflow Backfill에 대해서 정리하기 (Feat. catchup) (0) | 2025.03.25 |
|---|---|
| [Airflow] Airflow 3.0 version 탐구, docker로 설치 해보기 (0) | 2025.03.25 |
| [Airflow] Airflow 개념과 전체적인 구조 정리 (0) | 2025.01.17 |
| [Airflow] Airflow 3.0 version 변경점 정리 (0) | 2025.01.17 |
| [Airflow] TO-BE Batch job 프로세스 개선 - 3) DAG 이전 및 이슈 정리 (0) | 2024.06.22 |
댓글