본문 바로가기
728x90

DAGS4

[Airflow] Sensor 정리, ExternalTaskSensor 와 S3KeySensor 이번 포스트에는 Airflow에서 특정 작업 혹은 객체를 감지하는 Sensor, 그 중에서도 많이 쓰일 것으로 추정되는 S3KeySensor와 ExternalTaskSensor를 정리해보겠습니다. 1. Sensor란Airflow에서는 특정 상황이 발생할때까지 대기하는 Sensor Operator를 제공시간이 기준이 될 수도 있고 파일이나 외부 이벤트가 기준이 될수도 있음Sensor를 사용하면 이러한 상황이 발생할 때까지 기다렸다가 downstream task들이 진행되게 할 수 있음2. ExternalTaskSensor다른 DAG의 특정 작업이 끝나기를 체크했다가 다음 의존관계가 있는 task를 실행할 때 사용import pendulumfrom airflow import DAGfrom airflow.op.. 2024. 6. 22.
[Airflow] Prometheus & Grafana에서 확인 할 수 있는 Airflow metrics 정리 Airflow에서는 Statsd라는 컴포넌트를 통해 Airflow의 메트릭을 Prometheus로 보내고, Grafana에서 시각적으로 확인해 볼 수 있습니다.즉 Airflow에서 일어나는 일을 모니터링 할 수 있게 됩니다.Airflow의 Metric에는 어떤 것들이 있는지 공식 홈페이지를 통해 확인 해보겠습니다.그 중에서 유의깊게 봐야 할 metric에 대해서 빨간색으로 진하게(bold)처리 해두었으니, 필터링해서 보시면 될 것 같습니다.1. CountersCounters카운터는 단순히 증가하는 값을 나타내며, 일반적으로 주어진 간격 동안의 이벤트 횟수를 추적합니다.예를 들어, 요청이 서버로 들어오는 횟수나 오류가 발생한 횟수 등을 계산할 수 있습니다.카운터는 보통 리셋되지 않고 지속적으로 증가합니다... 2024. 5. 18.
[Airflow] DAG Parsing, DAG Processor 정리 + import와 parsing error 이슈 정리 Airflow DAG Parsingscheduler에 dag_processor라는것이 포함되어 있고, 이것이 dag를 parsing하는 역할을 해줌 1. 이슈 정리Airflow on k8s 이전을 위해 DAG를 개발계의 k8s에 gitsync를 시켜둔 상황import error가 나와 확인 해보니 version up에 따른 method 명칭 변경따라서 명칭을 변경하거나 사용하지 않은 library는 import문에서 제거아무리 기다려도 변경사항 반영이 안된 채로 계속 import에러가 잔류실제 코드는 gitsync를 통해 동기화 됐고, 컨테이너에 들어가서 확인해봐도 동기화 된 상태2. 원인airflow dag 내부에 운영계에 있는 RDB, Redshift와 connect하는 코드가 있었는데, 개발계에서 .. 2024. 4. 27.
[Airflow] EMR create + Step 제출(Spark job) + StepSensor Dag 구성하기(feat. ETL) 아직까지도 많은 기업에서는 EMR을 원하는 시간대에 띄워서 batch job을 airflow schedule에 맞게 실행시키고 종료시키는 ETL 형태를 많이 사용하고 있습니다. 그래서 이번 포스트에는 Airflow로 EMR을 띄우고, spark job을 제출하고 job이 끝나는 대로 EMR을 종료시키는 DAG를 구성해보겠습니다. Airflow는 미리 구성되어 있다고 가정하고 시작하겠습니다. Airflow 구성부터 해보고 싶으시면 아래의 링크를 참조해주세요. https://spidyweb.tistory.com/449 [Airflow] Airflow cluster, celery executor + flower + RabbitMQ 환경 구성하기 이번 포스트에는 AWS EC2 3대로 구성된 airflow clu.. 2023. 7. 30.
728x90