BigData/Apache Airflow

[Airflow] Sensor 정리, ExternalTaskSensor 와 S3KeySensor

스파이디웹 2024. 6. 22. 09:38
728x90

이번 포스트에는 Airflow에서 특정 작업 혹은 객체를 감지하는 Sensor, 그 중에서도 많이 쓰일 것으로 추정되는 S3KeySensor와 ExternalTaskSensor를 정리해보겠습니다.

 


1. Sensor란

  • Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공
  • 시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있음
  • Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있음

2. ExternalTaskSensor

  • 다른 DAG의 특정 작업이 끝나기를 체크했다가 다음 의존관계가 있는 task를 실행할 때 사용
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

   def prev_execution_dt(dt, **kwargs):
        return dt - timedelta(hours=1)


    # Define the task to wait for
    wait_for_task = ExternalTaskSensor(
        task_id='wait_for_task',
        external_dag_id='upstream_dag',  # ID of the other DAG
        allowed_states=['success'],
        execution_date_fn=prev_execution_dt,
        # execution_delta=timedelta(hours=1, minutes=30)
        timeout=1800,  # 30분, Timeout in seconds, set to None for no timeout
        mode='poke',   # Mode can be 'poke' or 'reschedule'
        poke_interval=60  # Time in seconds to wait between checks
    )
  • external_dag_id: 대기할 upstream dag id
  • allowed_states: 대기해야하는 upstream dag의 상태를 리스트 형태로 지정한다. default 값은 ['success']
  • timeout: 언제까지 대기할지 초 단위로 지정
  • mode: poke 모드로 수행할 것인지 reschedule 모드로 수행할것인지 지정한다. poke가 default 모드이기 때문에 생략해도 됨
  • poke_interval: 몇 분 간격으로 상태를 확인할지를 초단위 또는 timedelta 형식으로 지정한
upstream DAG의 스케줄시간과 체크하는 DAG의 스케줄링 시간이 동일해야 하는데, DAG와 동일한 logical date에 해당하는 upstream DAG의 수행을 확인하기 때문
따라서 스케줄 된 시간이 다른 경우 또는 manual 수행인 경우 Sensor가 제대로 작동하지 않음
  • 만약 스케줄을 동일하게 설정했을 때 대기하는 시간이 너무 길어지는 경우 execution_delta 또는 execution_date_fn을 사용하여 DAG의 스케줄 시간을 다르게 설정, 둘 다 datetime 및 timedelta를 통해 설정 가능
  • 단 두 개를 동시에 지정할 수는 없음

3. S3KeySensor

  • 특정 s3버킷의 폴더에 원하는 파일이름, 파일 형식이 있는지를 poke_interval마다 체크해서 리턴값을 받아오는 sensor
  • 마찬가지로 파일이 생성된 이후에 의존 관계가 있는 작업을 수행해야 할 때 사용 → 파일이 없으면 poke주기 동안 계속 체크를 하면서 생성 될 때까지 대기
  • s3 bucket에 접근 할 수 있는 IAM계정값이 있는 aws_conn_id 커넥션이 필요함
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

s3_file_check = S3KeySensor(
        task_id="s3_file_check",
        poke_interval=60,
        timeout=3000,
        bucket_key=f"keysensortest/*.txt",
        bucket_name="버킷이름",
        wildcard_match=True,
        aws_conn_id="aws_default",
    )
    s3_file_check

 

728x90