BigData/Apache Airflow
[Airflow] Sensor 정리, ExternalTaskSensor 와 S3KeySensor
스파이디웹
2024. 6. 22. 09:38
728x90
이번 포스트에는 Airflow에서 특정 작업 혹은 객체를 감지하는 Sensor, 그 중에서도 많이 쓰일 것으로 추정되는 S3KeySensor와 ExternalTaskSensor를 정리해보겠습니다.
1. Sensor란
- Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공
- 시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있음
- Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있음
2. ExternalTaskSensor
- 다른 DAG의 특정 작업이 끝나기를 체크했다가 다음 의존관계가 있는 task를 실행할 때 사용
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
def prev_execution_dt(dt, **kwargs):
return dt - timedelta(hours=1)
# Define the task to wait for
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='upstream_dag', # ID of the other DAG
allowed_states=['success'],
execution_date_fn=prev_execution_dt,
# execution_delta=timedelta(hours=1, minutes=30)
timeout=1800, # 30분, Timeout in seconds, set to None for no timeout
mode='poke', # Mode can be 'poke' or 'reschedule'
poke_interval=60 # Time in seconds to wait between checks
)
- external_dag_id: 대기할 upstream dag id
- allowed_states: 대기해야하는 upstream dag의 상태를 리스트 형태로 지정한다. default 값은 ['success']
- timeout: 언제까지 대기할지 초 단위로 지정
- mode: poke 모드로 수행할 것인지 reschedule 모드로 수행할것인지 지정한다. poke가 default 모드이기 때문에 생략해도 됨
- poke_interval: 몇 분 간격으로 상태를 확인할지를 초단위 또는 timedelta 형식으로 지정한
upstream DAG의 스케줄시간과 체크하는 DAG의 스케줄링 시간이 동일해야 하는데, DAG와 동일한 logical date에 해당하는 upstream DAG의 수행을 확인하기 때문
따라서 스케줄 된 시간이 다른 경우 또는 manual 수행인 경우 Sensor가 제대로 작동하지 않음
- 만약 스케줄을 동일하게 설정했을 때 대기하는 시간이 너무 길어지는 경우 execution_delta 또는 execution_date_fn을 사용하여 DAG의 스케줄 시간을 다르게 설정, 둘 다 datetime 및 timedelta를 통해 설정 가능
- 단 두 개를 동시에 지정할 수는 없음
3. S3KeySensor
- 특정 s3버킷의 폴더에 원하는 파일이름, 파일 형식이 있는지를 poke_interval마다 체크해서 리턴값을 받아오는 sensor
- 마찬가지로 파일이 생성된 이후에 의존 관계가 있는 작업을 수행해야 할 때 사용 → 파일이 없으면 poke주기 동안 계속 체크를 하면서 생성 될 때까지 대기
- s3 bucket에 접근 할 수 있는 IAM계정값이 있는 aws_conn_id 커넥션이 필요함
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
s3_file_check = S3KeySensor(
task_id="s3_file_check",
poke_interval=60,
timeout=3000,
bucket_key=f"keysensortest/*.txt",
bucket_name="버킷이름",
wildcard_match=True,
aws_conn_id="aws_default",
)
s3_file_check
728x90