본문 바로가기
BigData/Apache Airflow

[Airflow] Airflow 3.0 Feature, 변경 사항 및 UI 한번에 정리하기

by 스파이디웹 2025. 5. 27.
728x90

이번 포스트에는 Airflow 3.0에서 변경 되거나 도입된 다양한 Feature에 대해서 정리해보고, 이전에 docker compose를 통해 설치해본 Airflow 3.0 UI를 직접 확인해보면서 정리해보겠습니다.

Airflow 3.0 설치는 아래 링크를 참조해주세요.

https://spidyweb.tistory.com/591

 

[Airflow] Airflow 3.0 version 탐구 1) docker로 설치 해보기

이번에 Airflow 3.0 version이 release됐는데, 관련해서 docker로 한번 설치해보고, 변경 사항들을 직접 확인해보겠습니다. 1. AWS에 EC2 생성서버 스펙- 이미지: Amazon linux2- Instance type: c5n.large(2Vcores, 5.3GB Mem)

spidyweb.tistory.com


1. Airflow 3.0 변경점 및 도입된 점

3.0으로 업그레이드 되면서 많은 변경점이 있는데 간략하게 아래와 같은 키워드가 있습니다.

  • Modernization: 성능과 보안을 향상시키기 위해 최신 기술과 Python 버전을 활용
  • Simplification: 코드베이스와 사용자 경험을 간소화하기 위해 사용 중단된 기능 제거
  • Compliance: 변화하는 규제 기준을 충족하기 위해 보안 기능 강화
  • Community Feedback: 사용자 경험과 요청을 기반으로 한 변경사항 적용(필요 기능에 대해 설문을 계속 했었고, 반영한 것으로 보입니다. backfill, dag versioning 등등)

1) Airflow 3.0 핵심 변화 6가지

1-1. 클라이언트‑서버 아키텍처 전환

  • 새로운 Task Execution Interface 도입으로 스케줄러·메타DB와 태스크 실행 환경을 분리
  •  Edge Executor와 함께 멀티‑클라우드·온프렘·엣지 어디서나 실행 가능

 

1-2. 다중 언어 Task SDK

  • 태스크를 Python 외에 Go → Java → JavaScript 순으로 지원 예정
  •  DAG는 그대로 Python, 내부 태스크만 다른 언어로 작성 가능
  • Airflow AI SDK, PydanticAI(https://github.com/astronomer/airflow-ai-sdk)

 

1-3. 모던 React UI + DAG 버전 관리

  • Flask AppBuilder 제거, React + FastAPI 기반으로 실시간 새로고침·다크모드·플러그인 슬롯 제공. ‑ DAG Versioning·Bundle 다운로드 기능이 기본 탑재
  • 2.x 에서는 Flask와 Jinja2 + jQuery/Bootstrap로 구성되어 있던 UI를 React와 FastAPI로 변경
  • Dark Mode를 지원

버전 UI 기술 스택 특징
1.x Flask + Jinja2 + 기본 JS/jQuery 매우 기본적인 UI
2.x Flask + Jinja2 + jQuery/Bootstrap UI 개선, DAG 보기 향상
3.x (예정/알파) React (프론트) + FastAPI (백엔드 API) 모던 SPA 방식, 성능 및 UX 대폭 개선

 

 

1-4. Data Asset 모델 & 이벤트 기반 스케줄링

  • 기존 Dataset을 확장해 @asset 데코레이터로 자산 간 의존성을 선언
  • 외부 메시지 버스(SQS 등) 이벤트로 즉시 트리거 가능
    • Real-time event-driven data pipelines 구축이 가능
    • workflow는 cron-based 고정된 스케줄링으로 작업을 기다리기 보다, 새로운 데이터가 도착하자마자 실행이 가능하게끔 변경
    • 3.0 이전에는 2가지 방법이 있었음
      - You wait for the event with a sensor
      - You have an AWS Lambda using the Airflow API to trigger your pipeline
    • 하지만 아래와 같은 문제점이 존재
      ❌ Latency

      ❌ Waste of resources (sensors waiting)
      ❌ Rely on a third-party service
      ❌ Non-native mechanism
    • AssetWatcher를 통해 외부 자원을 모니터링
      1. 큐(현재는 SQS만 지원됨)를 참조하는 Trigger를 생성
      2. 위에서 만든 Trigger를 감시하는 Asset을 정의
      3. 해당 Asset을 기준으로 파이프라인을 스케줄링
      4. 아래 코드에 나온 것처럼 이벤트를 읽어옴

 

1-5. Scheduler‑Managed Backfill & 대규모 재실행 &  하나의 DAG에 여러 스케줄 지정가능

  • 백필 작업을 스케줄러가 자동 분산·모니터링하여 ML 재학습·대량 ETL에 최적화
  • backfill을 기존에 CLI를 통해 사용했어야 했다면, 3.0에서는 UI를 통해 쉽게 사용 가능
  • MultipleCronTriggerTimetable를 통해 하나의 DAG에 여러 스케줄링을 설정 가능
from airflow.timetables.trigger import MultipleCronTriggerTimetable


# At 1:10 and 2:40 each day.
@dag(schedule=MultipleCronTriggerTimetable("10 1 * * *", "40 2 * * *", timezone="UTC"), ...)
def example_dag():
    pass

backfill 수행 with UI

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html

 

Timetables — Airflow Documentation

 

airflow.apache.org

 

1-6. DAG Versioning & Dag Bundle

  • DAG에 대한 버전을 지원
  • 버전을 통해 DAG의 변경 내용들이 Metadata에 저장되어 트래킹
  • rollback하기가 쉬워짐

  • DAG Bundle: DAG 정의, 종속성 코드, 설정, 리소스를 하나의 단위로 패키징해서 Airflow에 배포할 수 있는 새로운 패키징 포맷

bundle Yaml 예시

dag_bundle:
  name: "my_cool_dag"
  version: "1.0.0"
  entrypoint: "dags/my_dag.py"
  requirements: "requirements.txt"

 

 

기능 설명
배포 단위화 DAG 단위로 격리된 번들 생성 가능
의존성 포함 가능 requirements.txt 포함 가능
로컬 테스트 용이 번들 단위로 실행 가능 (airflow dags test-bundle)
버전 명시 가능 각 번들마다 명확한 버전 명시
트래킹/롤백 가능성 증가 번들 변경 이력 추적이 쉬움
배포 툴과 통합 용이 GitOps, CI/CD 환경에 적합

 

https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-bundles.html

1-7. 레거시 정리와 보안 강화

  • Python 3.9 이상 필수, 2.x에서 deprecated된 코드 전면 삭제.
  • Task Isolation으로 워커가 메타DB에 직접 접근하지 않아 다중 테넌시 감사성 상승
구분 AS-IS(Airflow 2.x) TO-BE(Airflow 3.x) 변경 이유 기대 효과 및 대비사항
Python Version Python 3.7과 그 이상의 버전을 지원
  • 최소 Python 3.9+ 필요
  • 3.9 미만의 버전은 지원 중단
  • 최신 기능 반영: 3.9의 문법과 코드 효율성, 가독성
  • 보안 강화: 이전 버전의 파이썬은 보안관련 업데이트가 없으므로 잠재적 위험성 존재
  • 유지보수 부담 감소: 지원하는 Python 버전을 줄이면 테스트 및 개발 과정이 단순화됨
  • 코드 호환성
  • 의존성 업데이트
deprecated 코드 삭제 deprecated 된 코드 잔존 airflow 2.x의 모든 deprecated 된 코드를 삭제
  • 코드베이스 간소화: 복잡성을 줄여 프로젝트의 유지보수 및 기여를 더 쉽게 만듦
  • best practice 독려: 사용자들이 최신 방식과 API를 사용하도록 유도함
  • 기존 코드 점검
  • 신규로의 테스트
보안 및 규정 준수 기능 강화
  • RBAC(역할 기반 접근 제어) 및 인증 메커니즘 등 기본적인 보안 기능 제공
  • 암호화 및 보안 설정은 선택적으로 구성 가능
  • 보안 설정 의무화: 암호화 및 인증에 대해 보안이 강화된 기본 설정 적용
  • RBAC 개선: 보다 세분화된 권한 및 역할 제공
  • 규정 준수 기능: GDPR, HIPAA 등 규제를 지원하는 도구 추가
  • 보안 위협 증가: Airflow가 점점 더 중요한 워크플로우를 다루게 되면서 보안의 중요성이 커짐
  • 규제 요건 강화: 조직들이 더 엄격한 데이터 보호 법규를 준수해야 함
  • 보안 점검
  • 교육

2) 자잘한 변경점들

2-1) 성능 개선

  • 스케줄러 최적화: 더 빠른 태스크 실행을 위한 스케줄러 성능 개선
  • 비동기 처리 도입: 더 많은 작업을 동시에 처리할 수 있도록 비동기 프로그래밍 적용
  • 리소스 관리 개선: 확장성을 고려한 리소스 처리 능력 향상

2-2) 사라진 기능들

  • execution_date: execution_date가 사라지고 대신 dag_run.logical_date를 이용하도록 권고
  • SugDagOperator / SubDAGs: SubDag 대신에 TaskGroups를 이용
  • SLA callbacks / metrics: 추후에 Deadline Alerts 예정
  • DAG / Xcom Pickling: 파이썬 객체를 바이트 스트림으로 변환하는 것을 Pickling, 반대는 Unpickling이라 하는데 이것도 더이상 지원하지 않음, DAG Pickling의 경우 JSON 직렬화를, Xcom의 경우 Custom Xcom backend를 이용
  • DebugExecutor: DebugExecuor는 없어지고 대신 LocalExecutor를 이용

2. Airflow UI

1) Home

  • Light / Dark Mode Toggle 새로 도입(User 메뉴에 존재)
  • Home에서는 Airflow에서 실행되는 각 요소, MetaDatabase, Scheduler, Triggerer, Dag Processor에 대한 Health 체크가 한눈에 보이고 DAG의 상태 히스토리를 보여줌
  • 기존 버전은 Menu가 상단에 있었다면, Airflow 3.0에서는 좌측으로 변경 되었음

light mode
dark mode


2) Dags

리스트 형태와 아이콘 형태를 지원

 

특정 DAG를 수동 실행 시킬 때 Advanced Option을 통해 Run ID나 Notes, Configuration을 설정할 수 있다.

DAG는 바 형태와 그래프 형태, 2가지 형태로 확인 할 수 있으며, Task 실행 시는 아래와 같은 정보를 제공

  • Logs
  • Rendered Templates
  • XCom
  • Events
  • Code
  • Details

또한 DAG Version을 우측 상단에서 확인 할 수 있음

Task의 Details에서는 Dag의 Version ID 및 상세 정보를 확인 가능


3) Assets 

  • AssetsAirflow2에 있던 Dataset보다 넓은 개념으로, Airflow의 관점에서 실체화된 어떤 자원(resource), Dataset도 일종의 Asset이지만, Asset은 더 포괄적
    • Dataset은 Airflow에서 하나의 데이터 소스 또는 데이터 객체를 표현하는 개념(DAG 간 데이터 의존성을 위한 식별 가능한 데이터 단위 (파일, 테이블 등))
    • "이 DAG은 특정 Dataset이 변경되었을 때 실행되어야 한다"와 같은 데이터 기반 트리거를 구현하는 데 사용
  • 좋은 점은 어느 DAG, Task에서 사용 중인지 알 수 있음

클릭하면 다음과 같이 흐름도를 볼 수 있어 어느 DAG에서 사용 중인지 보기 쉽게 되어 있음

  • Airflow 3.0에서 말하는 Producing TasksConsuming DAGs데이터 중심 파이프라인(Data-aware scheduling) 기능에서 핵심적인 개념
    • 이 기능은 DAG 간의 데이터 기반 의존성을 명확하게 표현하고 자동으로 DAG을 트리거할 수 있도록 도움
  • Producing Tasks
    • 특정 Dataset을 생성하거나 갱신하는 Task
    • outlets=[Dataset()]을 통해 Dataset을 생성하는 Task
  • Consuming DAGs
    • 특정 Dataset이 업데이트되면 실행되는 DAG
    • schedule=[Dataset()]을 통해 Dataset 변경에 반응하는 DAG
  • 즉, 하나의 Task가 어떤 Dataset을 만들면, 그 Dataset을 사용해야 하는 다른 DAG이 자동으로 실행

Producing Tasks

from airflow import DAG
from airflow.operators.python import PythonOperator

def create_customer_data():
    # 실제로는 S3에 파일 생성하는 로직
    print("Customer data created")

with DAG(dag_id="produce_dag", schedule="@daily", start_date=datetime(2024, 1, 1)) as dag1:
    task = PythonOperator(
        task_id="produce_data",
        python_callable=create_customer_data,
        outlets=[customer_data],  # 이 Task가 Dataset을 '생성함'
    )

Consuming DAGs

from airflow import DAG
from airflow.operators.python import PythonOperator

def use_customer_data():
    print("Customer data consumed")

with DAG(dag_id="consume_dag", schedule=[customer_data], start_date=datetime(2024, 1, 1)) as dag2:
    task = PythonOperator(
        task_id="consume_data",
        python_callable=use_customer_data,
    )

 

 


4) browse

Audit Logs를 통해 시스템 내에서 발생하는 중요한 이벤트들을 기록하여 누가 언제 어떤 작업을 했는지 추적할 수 있으며,
보안, 컴플라이언스, 문제 해결, 사용자 행동 추적 등의 목적으로 사용

 

또한 XComs(Cross Communications)을 통해 태스크 간에 데이터를 주고받은 정보를 확인 할 수 있음

  • Audit Logs
  • XComs

 


5) admin

Variables 및 Pool, Provider, Plugin, Connection을 관리할 수 있는 메뉴

  • Variables
  • Pools
  • Providers
  • Plugins
  • Connections


6) Security

아직까지 3.0.1 버전에서도 아래 5개 목록을 클릭하면 이전의 버전 UI가 나오게 되고, Dark mode도 풀리게 됩니다.

  • Users
  • Roles
  • Actions
  • Resources
  • Permissions


3. 간단한 DAG 생성 및 실습

만들어둔 ./dags 경로에 새로운 dag를 하나 생성합니다.

from airflow import DAG
from airflow.decorators import task
from airflow.timetables.trigger import MultipleCronTriggerTimetable
from datetime import datetime

with DAG(
    dag_id="example_airflow_3_features_dag",
    description="Test Airflow 3.0 features: versioning, managed backfill, scalable reruns, multiple cron",
    start_date=datetime(2024, 6, 1),
    catchup=True,
    max_active_runs=3,
    schedule=MultipleCronTriggerTimetable(
        "0 0 * * *",  # 매일 자정
        "0 12 * * 1-5",  # 평일 낮 12시
        timezone="Asia/Seoul"
    ),
    tags=["airflow_3.0", "versioning", "backfill", "cron"],
) as dag:

    @task
    def print_context(**kwargs):
        run_id = kwargs.get("run_id")
        data_interval_start = kwargs.get("data_interval_start")
        data_interval_end = kwargs.get("data_interval_end")
        logical_date = kwargs.get("logical_date")  # execution_date의 대체

        print(f"Run ID: {run_id}")
        print(f"Data Interval Start: {data_interval_start}")
        print(f"Data Interval End: {data_interval_end}")
        print(f"Logical Date: {logical_date}")  # 여기로 변경

    
    """ 이후에 추가
    @task
    def add_task():
        print("추가된 task")
	"""
    
    print_context() #>> add_task()

UI에서 확인 할 수 있는 생성된 DAG

다중 스케줄이 적용된 모습

backfill도 정상적으로 수행되는 것을 알 수 있고, Trigger버튼을 통해 수동으로도 수행시킬 수 도 있습니다.

 

위의 코드에서 task를 추가하거나 변경하게 되는 경우에 DAG version이 v2로 변경되는 것을 확인할 수 있음

 

728x90

댓글