BigData/Apache Airflow

[Airflow] 하나의 DAGs에서 다른 DAGs 파일 호출하기 (TriggerDagRunOperator, ExternalTaskSensor, SSHOperator)

스파이디웹 2024. 5. 18. 23:26
728x90

Airflow에서 DAGs를 호출하는 방법은 여러가지가 있습니다. 관련해서 정리 해보겠습니다.


1. TriggerDagRunOperator

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from dateutil.relativedelta import relativedelta
from datetime import datetime as dt
from datetime import timedelta
from datetime import datetime
import time
import pendulum
from airflow import DAG
from utils import AlertUtils
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup


local_tz=pendulum.timezone("Asia/Seoul")
today = datetime.now(local_tz)
one_days_ago_str = datetime.strftime(today - timedelta(days=1), '%Y%m%d')
v_end_date = today.strftime('%Y-%m-%d')
two_days_ago = today - timedelta(days=2)
dayOfWeek = datetime.now(local_tz).strftime("%A")
dayDecimal = datetime.now(local_tz).strftime("%d")



sql_path = '/app/airflow-dags/dags/test/'

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False
}

with DAG(
        dag_id='trigger_dag',
        default_args=DEFAULT_ARGS,
        dagrun_timeout=timedelta(hours=2),
        start_date=two_days_ago,
        schedule_interval=None,
        tags=['test'],
        catchup=False
) as dag:
    trigger_dag = TriggerDagRunOperator(
        task_id="trigger_dag",
        trigger_dag_id="trigger_dag",
        wait_for_completion=True
    )

    trigger_dag
  • 가장 일반적인 방법
  • trigger_dag_id는 트리거할 DAG의 ID

2. ExternalTaskSensor

# parent_dag_with_sensor.py
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG('parent_dag_with_sensor', default_args=default_args, schedule_interval='@daily') as dag:
    start = DummyOperator(task_id='start')

    wait_for_child = ExternalTaskSensor(
        task_id='wait_for_child_task',
        external_dag_id='child_dag',  # 감시할 DAG의 ID
        external_task_id='end',       # 감시할 Task의 ID
        mode='poke',
        timeout=600,
        poke_interval=60,
        soft_fail=True
    )

    end = DummyOperator(task_id='end')

    start >> wait_for_child >> end
  • ExternalTaskSensor는 다른 DAG의 특정 Task가 완료될 때까지 기다렸다가 다음 Task를 실행할 수 있게 해주는 Sensor
  • 이 방법은 다른 DAG의 상태를 감시하고, 해당 DAG의 특정 Task가 완료되었을 때 이후의 Task를 실행하고자 할 때 유용

3. SSHOperator로 python file 호출

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG('parent_dag_with_ssh', default_args=default_args, schedule_interval='@daily') as dag:
    start = DummyOperator(task_id='start')

    run_child_dag = SSHOperator(
        task_id='run_child_dag',
        ssh_conn_id='my_ssh_connection',  # 설정한 SSH 연결 ID
        command='python3 /path/to/your/child_dag.py',
        do_xcom_push=False
    )

    end = DummyOperator(task_id='end')

    start >> run_child_dag >> end
  • 예전에 사용한 방법 중 하나는 TriggerDagRunOperator가 없을 때 서버에서 직접 python file을 SSHOperator를 통해 실행시키는 방향으로 다른 DAG를 호출 했었음
  • Python file에 매개변수를 전달하고자 할 때 argparse를 통해 SSHOperator에 cmd에 기입하여 전달한 경우도 있었음

 

728x90