[Airflow] Trigger Rule을 통해 반드시 실행 시켜야 할 Task 다루기
이번 포스트에는 Daily Batch 중에 비용절감을 위해 수정한 사항에 대해 정리해보겠습니다.
1. 수정이 필요한 이유
Daily Batch 실패 난 건 중에, ML server(GPU 장비 탑재된 EC2)에서 script가 돌아가는 job이 있었습니다.
EC2가 켜지는 Task는 성공했지만, script task 단계에서 에러가 났었고, EC2는 stop되지 않았습니다.
물론 후속작업들도 전부 upstream_failed에 의해 failed처리 되었습니다.
GPU 장비가 붙은 EC2는 비용이 어마어마하게 많이 나오기 때문에, 몇 시간만 켜져 있더라도 무시 하지 못할 비용이 나오게 됩니다. 따라서 script가 실패하더라도, 반드시 stop되게 DAG를 구성하는 것이 필요했습니다.
2. 기존 코드
기존의 코드는 병렬 처리 없이 task 마다 의존성을 구성하였고, trigger_rule 또한 모두 default값인 all_success로 되어 있었습니다. 따라서 EC2 서버에서 실행되는 script에서 에러가 난 경우, ec2_stop작업이 실행되지 않고, 계속 켜져 있는 문제가 생겼습니다.
그렇다고 ec2_stop 작업을 성공여부와 관계없이 항상 실행되게 trigger_rule을 all_done으로 설정한다면, ec2는 중지되겠지만, 그 뒤의 작업들이 실행되게 되어 선행 작업이 수행되지 않고 후행 작업이 실행되는 문제가 발생합니다.
with DAG(
...
) as dag:
script_on_ec2 = PythonOperator(
task_id='script_on_ec2',
python_callable=always_fail_task
)
ec2_stop = BashOperator(
task_id='ec2_stop',
bash_command='sleep 5',
trigger_rule='all_success'
)
create_emr = BashOperator(
task_id='create_emr',
bash_command='sleep 5',
trigger_rule='all_success'
)
after_job = BashOperator(
task_id='after_job',
bash_command='sleep 5',
trigger_rule='all_success'
)
...
script_on_ec2 >> ec_stop >> create_emr >> after_job >> ...
3. 바뀐 코드 및 DAG
더 좋은 방법이 있을 것이라고 생각되지만, 당시에 제가 떠올린 생각은 ec2_stop task와 후행작업을 병렬로 구성하고
ec2_stop task의 trigger_rule은 all_done, 후행작업의 trigger_rule은 all_success로 구성하는 것이였습니다.
이렇게 된다면 아래 2가지의 경우로 나뉘게 됩니다.
- always_fail_task 성공, all_done 항상 성공, all_success 이전 작업 성공에 의해 성공, all_success_2 또한 앞선 task가 모두 성공에 의해 성공 + 후행작업들 실행
- always_fail_task 실패, all_done 항상 성공, all_success 이전 작업 실패에 의해 실패, all_success_2 또한 앞선 작업이 전부 성공하지 못했기 때문에 실패 + 후행작업들 upstream_failed 처리
with DAG(
...
) as dag:
always_fail_task = PythonOperator(
task_id='always_fail_task',
python_callable=always_fail_task
) # ec2내에서 도는 script라 가정
all_done = BashOperator(
task_id='all_done',
bash_command='sleep 5',
trigger_rule='all_done'
) # ec2_stop task라 가정
all_success = BashOperator(
task_id='all_success',
bash_command='sleep 5',
trigger_rule='all_success'
) # ec2_stop 후행작업인 emr_create작업이라 가정
all_success_2 = BashOperator(
task_id='all_success_2',
bash_command='sleep 5',
trigger_rule='all_success'
) # emr에 step을 제출하는 작업이라 가정
always_fail_task >> [all_done, all_success] >> all_success_2
위와 같이 설정하고 난 후에 script에 실패가 났을 때에도 EC2는 항상 중지되게 구성되었으며 비용 절감을 할 수 있게 수정되었습니다.