본문 바로가기
BigData/Apache Airflow

[Airflow] Airflow Backfill에 대해서 정리하기 (Feat. catchup)

by 스파이디웹 2025. 3. 25.
728x90

이번 포스트에는 Airflow Backfill기능에 대해서 집중적으로 정리 해보겠습니다.

 


1. Backfill 기능이란?

한 마디로 한다면 "Airflow에서 이전에 실행되지 않은 작업(태스크)을 자동으로 실행하는 과정"

  • DAG(Directed Acyclic Graph)에서 이전의 실행 날짜나 특정 기간에 대한 작업을 다시 수행, backfill을 실행할 때 시작 날짜와 종료 날짜를 지정하여 특정 기간 동안의 작업을 재실행
  • start_date 부터 시작하지만 end_date 은 포함하지 않음
  • 주로 작업이 실패했거나 실행되지 않은 경우에 유용하게 사용
  • execution_date를 사용해서 Incremental update가 구현되어 있어야 의미가 있음, master성(full refresh는 멱등성이 보장된 방법이기에 필요가 없다.)

2. Backfill 기능이 필요한 경우

 

  • 작업 실패: 이전에 실행된 DAG의 일부 작업이 실패하거나 일부 작업이 누락된 경우
  • 새로운 태스크 추가: 기존 DAG에 새로 추가된 태스크가 이전 실행에 적용되지 않았을 때
  • 특정 날짜에 대한 작업 실행: 특정 날짜에 대해 수동으로 작업을 재실행하고 싶을 때, 

3. Backfill 실행 방법

  • Airflow에서 backfill을 실행하면, DAG의 특정 날짜 범위에 대해 태스크들을 다시 실행
  • Airflow는 주어진 날짜 범위에 대한 모든 태스크 인스턴스를 확인하고, 그 기간 동안 실행되지 않은 태스크들을 실행

dag = DAG(
    'backfill_example',
    default_args=default_args,
    description='A simple backfill example',
    schedule_interval=timedelta(days=1),  # 매일 실행
    start_date=datetime(2025, 3, 1),
    catchup=True,  # 이 옵션이 True일 경우 backfill이 발생함
    depends_on_past=True, # 이 옵션이 True일 경우, 이전의 DAG 실행이 실패 한 경우, 다음 실행이 되지 않음
)

 

  • catchup=True: start_date부터 현재까지의 모든 누락된 실행을 자동으로 backfill
  • catchup=False: start_date 이후에 발생한 실행만을 수행하며, 과거에 대한 backfill은 하지 않음

명령어 실행 방법

airflow dags backfill <DAG_ID> -s <START_DATE> -e <END_DATE> [옵션들]

 

인수 설명 예시
<DAG_ID> 백필할 DAG의 ID. 이 인수는 필수 example_dag
-s <START_DATE> 백필 시작 날짜. 백필할 시작 날짜를 설정합니다. 날짜 형식은 YYYY-MM-DD. -s 2023-01-01
-e <END_DATE> 백필 종료 날짜. 백필할 종료 날짜를 설정합니다. 날짜 형식은 YYYY-MM-DD -e 2023-01-07
-t <TASK_ID> 특정 태스크만 백필할 경우 해당 태스크의 task_id를 지정합니다. 만약 이 인수를 사용하면 지정된 태스크만 백필 -t task1
--mark_success 백필이 완료되면 해당 실행을 "성공"으로 표시합니다. 이 옵션을 사용하면 백필이 완료된 후 실행된 DAG 인스턴스는 success 상태로 설정 --mark_success
--rerun_failed_tasks 실패한 태스크만 다시 실행합니다. 기본적으로는 백필이 시작되면 모든 태스크가 실행되지만, 이 옵션을 사용하면 실패한 태스크만 실행 --rerun_failed_tasks
--exclude_queued 대기 중인 태스크를 제외하고 백필합니다. 이 옵션을 사용하면 대기 중인(queued) 상태의 태스크는 백필에서 제외 --exclude_queued
--dry_run 실제로 실행하지 않고 시뮬레이션만 합니다. 백필이 어떤 태스크를 실행할지 확인하고 싶은 경우에 사용 --dry_run
--no_confirm 사용자 확인을 생략하고 즉시 실행합니다. 이 옵션을 사용하면 백필 시작 전에 사용자 확인을 생략하고 바로 시작 --no_confirm
--verbose 백필의 상세 로그를 표시합니다. 실행 과정에서 발생하는 로그를 더 자세히 보고 싶을 때 사용 --verbose
--pool <POOL> 지정된 풀에서 백필을 실행합니다. Airflow는 태스크 실행을 풀에 따라 제한할 수 있습니다. 풀 이름을 지정하여 백필을 실행할 수 있음 --pool example_pool
--wait_for_downstream 하위 태스크가 완료될 때까지 기다립니다. 이 옵션을 사용하면, 상위 태스크가 실행된 후 하위 태스크가 완료될 때까지 기다렸다가 실행 --wait_for_downstream

trigger_rule vs depends_on_past
둘 다 의존성과 관련이 있다고 볼 수 있지만,
trigger_rule은 전 후 task에 대한 의존성을 즉, task끼리 실패 성공 여부에 따라 다음 task를 실행 할 지 판단하는 파라미터라면,
depends_on_past는 dag전체에 대한 실패 성공 여부에 따라 다음 dag를 실행 할 지 판단하는 파라미터이다.

4. 주의할 점★

backfill이란 것은 결국 설정한 start_date 값으로 부터 하나씩 차례로 dag를 실행시키는 것이기 때문에 결국 변수는 start_date(execution_date)이라는 것이 핵심

 

즉, DAG내 실행시키는 작업이 python, SQL인 경우에 변수가 start_date를 사용하지 않는 경우라면, 동일한 코드를 과거시점에서부터 현재까지 여러번 실행시키게 될 뿐, 의도한 과거의 데이터부터 현재까지의 데이터가 채워지지 않게 될 것입니다.

 

1) backfill이 적용되는 python task 예시

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_data(execution_date, **kwargs):
    # execution_date를 동적으로 활용
    print(f"Running process for {execution_date}")
    
    # 예시: 과거 날짜를 사용하여 데이터를 처리하는 로직
    year = execution_date.year
    month = execution_date.month
    day = execution_date.day
    
    # 예시로, 특정 파일이나 DB에서 데이터를 가져올 때 날짜를 기준으로 처리
    print(f"Processing data for {year}-{month:02d}-{day:02d}")
    # 여기서 필요한 로직을 구현: 파일 읽기, DB 쿼리 실행 등

    # 예시로, 특정 SQL 쿼리나 다른 작업을 실행할 수도 있습니다.
    # 예: fetch_data_from_db(execution_date) 이런 식으로

# DAG 정의
dag = DAG(
    'example_backfill_dag',
    default_args={
        'owner': 'airflow',
    },
    schedule_interval='@daily',
    start_date=datetime(2025, 3, 1),
    catchup=True  # 이 옵션이 True일 경우 backfill이 발생함
)

# PythonOperator로 실행
process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    op_args=['{{ execution_date }}'],  # execution_date를 인자로 넘겨줌
    provide_context=True,
    dag=dag,
)

2) backfill이 적용되는 SQL task 예시

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

# SQL 쿼리 예시
sql_query = """
    SELECT *
    FROM my_table
    WHERE date_column = '{{ execution_date }}';
"""

# DAG 정의
dag = DAG(
    'example_backfill_dag',
    default_args={
        'owner': 'airflow',
    },
    schedule_interval='@daily',
    start_date=datetime(2025, 3, 1),
    catchup=True  # 이 옵션이 True일 경우 backfill이 발생함
)

# SQL 쿼리 실행
query_task = PostgresOperator(
    task_id='run_sql_query',
    sql=sql_query,
    postgres_conn_id='my_postgres_connection',
    autocommit=True,
    dag=dag,
)

 

결론

DAG내에 실행되는 task에서 사용되는 job의 변수 설정이 start_date(execution_date)에 맞게 설정되어야 back fill이 의미가 있음

 

 

 

728x90

댓글