BigData/Apache Airflow

[Airflow] Prometheus & Grafana에서 확인 할 수 있는 Airflow metrics 정리

스파이디웹 2024. 5. 18. 23:36
728x90

Airflow에서는 Statsd라는 컴포넌트를 통해 Airflow의 메트릭을 Prometheus로 보내고, Grafana에서 시각적으로 확인해 볼 수 있습니다.

즉 Airflow에서 일어나는 일을 모니터링 할 수 있게 됩니다.

Airflow의 Metric에는 어떤 것들이 있는지 공식 홈페이지를 통해 확인 해보겠습니다.

그 중에서 유의깊게 봐야 할 metric에 대해서 빨간색으로 진하게(bold)처리 해두었으니, 필터링해서 보시면 될 것 같습니다.


1. Counters

Counters

  • 카운터는 단순히 증가하는 값을 나타내며, 일반적으로 주어진 간격 동안의 이벤트 횟수를 추적합니다.
  • 예를 들어, 요청이 서버로 들어오는 횟수나 오류가 발생한 횟수 등을 계산할 수 있습니다.
  • 카운터는 보통 리셋되지 않고 지속적으로 증가합니다. 따라서 모니터링 시 카운터 값의 증가율을 관찰하여 시스템의 활동을 추적할 수 있습니다.

 

이름 설명
<job_name>_start Number of started <job_name> job, ex. SchedulerJob, LocalTaskJob
<job_name>_end Number of ended <job_name> job, ex. SchedulerJob, LocalTaskJob
<job_name>_heartbeat_failure Number of failed Heartbeats for a <job_name> job, ex. SchedulerJob, LocalTaskJob
local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code> Number of LocalTaskJob terminations with a <return_code> while running a task <task_id> of a DAG <dag_id>.
local_task_job.task_exit Number of LocalTaskJob terminations with a <return_code> while running a task <task_id> of a DAG <dag_id>. Metric with job_id, dag_id, task_id and return_code tagging.
operator_failures_<operator_name> Operator <operator_name> failures
operator_failures Operator <operator_name> failures. Metric with operator_name tagging.
operator_successes_<operator_name> Operator <operator_name> successes
operator_successes Operator <operator_name> successes. Metric with operator_name tagging.
ti_failures Overall task instances failures. Metric with dag_id and task_id tagging.
ti_successes Overall task instances successes. Metric with dag_id and task_id tagging.
previously_succeeded Number of previously succeeded task instances. Metric with dag_id and task_id tagging.
zombies_killed Zombie tasks killed. Metric with dag_id and task_id tagging.
scheduler_heartbeat Scheduler heartbeats
dag_processing.processes Relative number of currently running DAG parsing processes (ie this delta is negative when, since the last metric was sent, processes have completed). Metric with file_path and action tagging.
dag_processing.processor_timeouts Number of file processors that have been killed due to taking too long. Metric with file_path tagging.
dag_processing.sla_callback_count Number of SLA callbacks received
dag_processing.other_callback_count Number of non-SLA callbacks received
dag_processing.file_path_queue_update_count Number of times we’ve scanned the filesystem and queued all existing dags
dag_file_processor_timeouts (DEPRECATED) same behavior as dag_processing.processor_timeouts
dag_processing.manager_stalls Number of stalled DagFileProcessorManager
dag_file_refresh_error Number of failures loading any DAG files
scheduler.tasks.killed_externally Number of tasks killed externally. Metric with dag_id and task_id tagging.
scheduler.orphaned_tasks.cleared Number of Orphaned tasks cleared by the Scheduler
scheduler.orphaned_tasks.adopted Number of Orphaned tasks adopted by the Scheduler
scheduler.critical_section_busy Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process.
sla_missed Number of SLA misses. Metric with dag_id and task_id tagging.
sla_callback_notification_failure Number of failed SLA miss callback notification attempts. Metric with dag_id and func_name tagging.
sla_email_notification_failure Number of failed SLA miss email notification attempts. Metric with dag_id tagging.
ti.start.<dag_id>.<task_id> Number of started task in a given dag. Similar to <job_name>_start but for task
ti.start Number of started task in a given dag. Similar to <job_name>_start but for task. Metric with dag_id and task_id tagging.
ti.finish.<dag_id>.<task_id>.<state> Number of completed task in a given dag. Similar to <job_name>_end but for task
ti.finish Number of completed task in a given dag. Similar to <job_name>_end but for task Metric with dag_id and task_id tagging.
dag.callback_exceptions Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working. Metric with dag_id tagging
celery.task_timeout_error Number of AirflowTaskTimeout errors raised when publishing Task to Celery Broker.
celery.execute_command.failure Number of non-zero exit code from Celery task.
task_removed_from_dag.<dag_id> Number of tasks removed for a given dag (i.e. task no longer exists in DAG).
task_removed_from_dag Number of tasks removed for a given dag (i.e. task no longer exists in DAG). Metric with dag_id and run_type tagging.
task_restored_to_dag.<dag_id> Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file)
task_restored_to_dag.<dag_id> Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file). Metric with dag_id and run_type tagging.
task_instance_created_<operator_name> Number of tasks instances created for a given Operator
task_instance_created Number of tasks instances created for a given Operator. Metric with dag_id and run_type tagging.
triggerer_heartbeat Triggerer heartbeats
triggers.blocked_main_thread Number of triggers that blocked the main thread (likely due to not being fully asynchronous)
triggers.failed Number of triggers that errored before they could fire an event
triggers.succeeded Number of triggers that have fired at least one event
dataset.updates Number of updated datasets
dataset.orphaned Number of datasets marked as orphans because they are no longer referenced in DAG schedule parameters or task outlets
dataset.triggered_dagruns Number of DAG runs triggered by a dataset update

 


2. Gauges

Guages

  • 게이지는 특정 시점에서의 값을 나타내며, 주로 변화하지 않는 값을 추적합니다.
  • 예를 들어, 현재 사용 중인 메모리 양이나 디스크 공간의 남은 양 등을 게이지로 표현할 수 있습니다.
  • 게이지는 시간에 따라 변하지 않는 값이므로 현재 상태를 나타내는 데 사용됩니다.

 

이름 설명
dagbag_size Number of DAGs found when the scheduler ran a scan based on its configuration
dag_processing.import_errors Number of errors from trying to parse DAG files
dag_processing.total_parse_time Seconds taken to scan and import dag_processing.file_path_queue_size DAG files
dag_processing.file_path_queue_size Number of DAG files to be considered for the next scan
dag_processing.last_run.seconds_ago.<dag_file> Seconds since <dag_file> was last processed
scheduler.tasks.starving Number of tasks that cannot be scheduled because of no open slot in pool
scheduler.tasks.executable Number of tasks that are ready for execution (set to queued) with respect to pool limits, DAG concurrency, executor state, and priority.
executor.open_slots Number of open slots on executor
executor.queued_tasks Number of queued tasks on executor
executor.running_tasks Number of running tasks on executor
pool.open_slots.<pool_name> Number of open slots in the pool
pool.open_slots Number of open slots in the pool. Metric with pool_name tagging.
pool.queued_slots.<pool_name> Number of queued slots in the pool
pool.queued_slots Number of queued slots in the pool. Metric with pool_name tagging.
pool.running_slots.<pool_name> Number of running slots in the pool
pool.running_slots Number of running slots in the pool. Metric with pool_name tagging.
pool.deferred_slots.<pool_name> Number of deferred slots in the pool
pool.deferred_slots Number of deferred slots in the pool. Metric with pool_name tagging.
pool.starving_tasks.<pool_name> Number of starving tasks in the pool
pool.starving_tasks Number of starving tasks in the pool. Metric with pool_name tagging.
triggers.running.<hostname> Number of triggers currently running for a triggerer (described by hostname)
triggers.running Number of triggers currently running for a triggerer (described by hostname). Metric with hostname tagging.

 


3. Timers

 

이름 설명
dagrun.dependency-check.<dag_id> Milliseconds taken to check DAG dependencies
dagrun.dependency-check Milliseconds taken to check DAG dependencies. Metric with dag_id tagging.
dag.<dag_id>.<task_id>.duration Seconds taken to run a task
task.duration Seconds taken to run a task. Metric with dag_id and task-id tagging.
dag.<dag_id>.<task_id>.scheduled_duration Seconds a task spends in the Scheduled state, before being Queued
task.scheduled_duration Seconds a task spends in the Scheduled state, before being Queued. Metric with dag_id and task_id tagging.
dag.<dag_id>.<task_id>.queued_duration Seconds a task spends in the Queued state, before being Running
task.queued_duration Seconds a task spends in the Queued state, before being Running. Metric with dag_id and task_id tagging.
dag_processing.last_duration.<dag_file> Seconds taken to load the given DAG file
dag_processing.last_duration Seconds taken to load the given DAG file. Metric with file_name tagging.
dagrun.duration.success.<dag_id> Seconds taken for a DagRun to reach success state
dagrun.duration.success Seconds taken for a DagRun to reach success state. Metric with dag_id and run_type tagging.
dagrun.duration.failed.<dag_id> Seconds taken for a DagRun to reach failed state
dagrun.duration.failed Seconds taken for a DagRun to reach failed state. Metric with dag_id and run_type tagging.
dagrun.schedule_delay.<dag_id> Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date
dagrun.schedule_delay Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date. Metric with dag_id tagging.
scheduler.critical_section_duration Milliseconds spent in the critical section of scheduler loop – only a single scheduler can enter this loop at a time
scheduler.critical_section_query_duration Milliseconds spent running the critical section task instance query
scheduler.scheduler_loop_duration Milliseconds spent running one scheduler loop
dagrun.<dag_id>.first_task_scheduling_delay Seconds elapsed between first task start_date and dagrun expected start
dagrun.first_task_scheduling_delay Seconds elapsed between first task start_date and dagrun expected start. Metric with dag_id and run_type tagging.
collect_db_dags Milliseconds taken for fetching all Serialized Dags from DB
kubernetes_executor.clear_not_launched_queued_tasks.duration Milliseconds taken for clearing not launched queued tasks in Kubernetes Executor
kubernetes_executor.adopt_task_instances.duration Milliseconds taken to adopt the task instances in Kubernetes Executor

 

 

 

참조:

https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html

 

728x90