본문 바로가기
BigData/Apache Airflow

[Airflow] DAG Parsing, DAG Processor 정리 + import와 parsing error 이슈 정리

by 스파이디웹 2024. 4. 27.
728x90

Airflow DAG Parsing

  • scheduler에 dag_processor라는것이 포함되어 있고, 이것이 dag를 parsing하는 역할을 해줌

Airflow dag parsing process

 


1. 이슈 정리

  • Airflow on k8s 이전을 위해 DAG를 개발계의 k8s에 gitsync를 시켜둔 상황
  • import error가 나와 확인 해보니 version up에 따른 method 명칭 변경
  • 따라서 명칭을 변경하거나 사용하지 않은 library는 import문에서 제거
  • 아무리 기다려도 변경사항 반영이 안된 채로 계속 import에러가 잔류
  • 실제 코드는 gitsync를 통해 동기화 됐고, 컨테이너에 들어가서 확인해봐도 동기화 된 상태


2. 원인

  • airflow dag 내부에 운영계에 있는 RDB, Redshift와 connect하는 코드가 있었는데, 개발계에서 진행 했기 때문에 해당코드가 문제를 일으킴 → 계속 연결하려고 시도하기 때문에 DAGs 파일에 변경 사항이 적용되더라도 parsing자체가 되지 않아 UI에 반영되지 않았던 것.

3. 해결법

  • 운영계로 넘어가면 해결될 문제
  • scheduler pod의 shceduler container로 접속 후 scheduler의 dag_processor_manager log를 확인
    • pwd /opt/airflow/logs/dag_processor_manager의 dag_processor_manager.log
  • 아래 log와 같이 현재 실행되고 있는 dag의 경과 시간과 실패한 dag의 마지막 실행 시간을 확인 할 수 있는데, 실패 한 DAG의 실행 시간은 50s로 비슷하고 dag parsing은 2개의 dag씩 진행되고 있는 것을 확인 할 수 있음 → configs와 관련이 있다.


4. configs

dag parsing과 관련된 configs중에서 위 현상과 관련된 건 2개가 있는데 1,2번에 따라 connection에러가 있는 dag들이 50초동안 parsing시도를 2개의 프로세서가 하고 있었던 것. 따라서 connection하는 코드를 지우고 반영을 해도 현재 parsing이 되지 않은 dag 전부를 2개의 프로세서가 차례로 50초씩 사용하고 있었기 때문에 DAG parsing이 느린 것으로 생각된 것.

1. parsing_processes(AIRFLOW__SCHEDULER__PARSING_PROCESSES)

  • airflow.cfg에 사용될 때 parsing_processes로 사용, 기본 값 2 즉, parsing용 process가 2개 뜨는 것

  • helm chart의 values.yaml에서는 config: 밑의 scheduler 부분에 parsing_processes에 정의

2. dag_file_processor_timeout
(AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT)

  • airflow.cfg에서 사용될 때 dag_file_processor_timeout로 사용되고 기본값은 50초

  • helm chart의 values.yaml에서는 config: 밑의 core 부분에 dag_file_processor_timeout에 정의
  • 말그대로 DAG가 파싱이 가능한지를 체크할 수 있는 최대 대기시간

3. dagbag_import_timeout(AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT)

  • airflow.cfg에서 사용될 때 dagbag_import_timeout로 사용되고 기본값은 30초

  • helm chart의 values.yaml에서는 config: 밑의 core 부분에 dagbag_import_timeout에 정의
  • 객체가 Airflow에서 사용 가능한 DAG로 로드되는 데 걸리는 최대 시간을 제어
  • Airflow는 DAG 객체를 처리하고 DAG 상태를 업데이트하며, DAG에 대한 작업을 스케줄링
  • dagbag_import_timeout은 DAG 파일이 로드된 후 DAG 객체가 완전히 처리되는 데 소요되는 시간을 제어
  • DAG가 파싱 될 수 있는 상태에서 파싱되기 시작하고 complete 되기 까지의 기다릴 수 있는 시간 → 파싱 시간이 설정 값인 30초를 넘으면 import error가 난다.
728x90

댓글