728x90
Airflow에서 DAGs를 호출하는 방법은 여러가지가 있습니다. 관련해서 정리 해보겠습니다.
1. TriggerDagRunOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from dateutil.relativedelta import relativedelta
from datetime import datetime as dt
from datetime import timedelta
from datetime import datetime
import time
import pendulum
from airflow import DAG
from utils import AlertUtils
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
local_tz=pendulum.timezone("Asia/Seoul")
today = datetime.now(local_tz)
one_days_ago_str = datetime.strftime(today - timedelta(days=1), '%Y%m%d')
v_end_date = today.strftime('%Y-%m-%d')
two_days_ago = today - timedelta(days=2)
dayOfWeek = datetime.now(local_tz).strftime("%A")
dayDecimal = datetime.now(local_tz).strftime("%d")
sql_path = '/app/airflow-dags/dags/test/'
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False
}
with DAG(
dag_id='trigger_dag',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=two_days_ago,
schedule_interval=None,
tags=['test'],
catchup=False
) as dag:
trigger_dag = TriggerDagRunOperator(
task_id="trigger_dag",
trigger_dag_id="trigger_dag",
wait_for_completion=True
)
trigger_dag
- 가장 일반적인 방법
- trigger_dag_id는 트리거할 DAG의 ID
2. ExternalTaskSensor
# parent_dag_with_sensor.py
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('parent_dag_with_sensor', default_args=default_args, schedule_interval='@daily') as dag:
start = DummyOperator(task_id='start')
wait_for_child = ExternalTaskSensor(
task_id='wait_for_child_task',
external_dag_id='child_dag', # 감시할 DAG의 ID
external_task_id='end', # 감시할 Task의 ID
mode='poke',
timeout=600,
poke_interval=60,
soft_fail=True
)
end = DummyOperator(task_id='end')
start >> wait_for_child >> end
- ExternalTaskSensor는 다른 DAG의 특정 Task가 완료될 때까지 기다렸다가 다음 Task를 실행할 수 있게 해주는 Sensor
- 이 방법은 다른 DAG의 상태를 감시하고, 해당 DAG의 특정 Task가 완료되었을 때 이후의 Task를 실행하고자 할 때 유용
3. SSHOperator로 python file 호출
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('parent_dag_with_ssh', default_args=default_args, schedule_interval='@daily') as dag:
start = DummyOperator(task_id='start')
run_child_dag = SSHOperator(
task_id='run_child_dag',
ssh_conn_id='my_ssh_connection', # 설정한 SSH 연결 ID
command='python3 /path/to/your/child_dag.py',
do_xcom_push=False
)
end = DummyOperator(task_id='end')
start >> run_child_dag >> end
- 예전에 사용한 방법 중 하나는 TriggerDagRunOperator가 없을 때 서버에서 직접 python file을 SSHOperator를 통해 실행시키는 방향으로 다른 DAG를 호출 했었음
- Python file에 매개변수를 전달하고자 할 때 argparse를 통해 SSHOperator에 cmd에 기입하여 전달한 경우도 있었음
728x90
'BigData > Apache Airflow' 카테고리의 다른 글
[Airflow] Prometheus & Grafana에서 확인 할 수 있는 Airflow metrics 정리 (0) | 2024.05.18 |
---|---|
[Airflow] SLA(Service Layer Agreement) 서비스 수준 계약 정리 (0) | 2024.05.18 |
[Airflow] DAG Parsing, DAG Processor 정리 + import와 parsing error 이슈 정리 (0) | 2024.04.27 |
[Airflow] Airflow 암호화 fernet key 정리 (0) | 2024.04.27 |
[Airflow] Metastore version, RDBMS 종류에 따른 차이 정리 (0) | 2024.04.27 |
댓글