BigData/Apache Airflow

[Airflow] Airflow DAGs 이상감지, 알림받기, 결과전송 (EmailOperator, Slack)

스파이디웹 2023. 8. 30. 03:41
728x90

이번 포스트에는 Airflow DAGs이 success 및 failed 또는 Task중에 보내고 싶은 결과가 있는 경우 전송하는 방법에 대해 정리해보겠습니다.

 

방법으로는 EmailOperator와 Slack을 사용하는 방법으로 크게 2가지가 있습니다.


EmailOperator

1. 준비 사항

1) GMAIL 계정 생성

EmailOperator를 사용하기 위해서는 stmp로 설정할 host가 필요합니다.

저는 gmail을 사용하기로 했고 새로운 계정을 하나 만들었습니다.

 

2) IMAP 켜기

GMAIL → 설정 → 모든 설정 보기

전달 및 POP/IMAP → IMAP 사용

3) 보안 설정

구글 계정 관리 → 보안 → 2단계 인증

앱 비밀번호 클릭

메일, 기기 선택

16자리 비밀번호가 생성 되는데, 보관했다가 airflow.cfg에 반영해야 합니다.

4) Airflow.cfg 내용 수정

smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = spidywebpjh@gmail.com
smtp_password = (발급받은 16자리 패스워드)
smtp_port = 587
smtp_mail_from = spidywebpjh@gmail.com
smtp_timeout = 30
smtp_retry_limit = 5


2. Airflow DAGs 구성

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.dates import days_ago
from datetime import timedelta


cmd="echo 'test succeed'"

default_args = {
    'owner': 'spidyweb',
    'retries': 0,
    'retry_delay': timedelta(seconds=20),
    'depends_on_past': False
}

with DAG(
    'airflow-emailoperator-test',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='*/10 * * * *',
    catchup=False,
    tags=['airflow-emailoperator-test'],
    is_paused_upon_creation=False,
) as dag:
    test_task = BashOperator(
    task_id='test_task',
    bash_command=cmd,
    )

    send_mail = EmailOperator(
    task_id='send_mail',
    to='spidywebpjh@gmail.com',
    subject='airflow EmailOperator를 통한 메일 발송 테스트',
    html_content=
    f'''
    이것은 airflow EmailOperator를 통해 발송되는 메일입니다. <br/>
    줄 바꿈은 위와같이 br을 사용하여 바꿉니다. <br/><br/>
    감사합니다.
    '''
    )

#의존관계 구성
    test_task >> send_mail

결과 확인

성공 시, 실패 시 메일 받기

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.dates import days_ago
from datetime import timedelta


cmd="echo 'test succeed'"

default_args = {
    'owner': 'spidyweb',
    'retries': 0,
    'retry_delay': timedelta(seconds=20),
    'depends_on_past': False
}

def get_email_success():
    send_mail = EmailOperator(
    task_id='send_mail',
    to='spidywebpjh@gmail.com',
    subject='airflow EmailOperator를 통한 메일 발송 테스트',
    html_content=
    f'''
    Airflow task 성공!
    '''
    )
    
 def get_email_fail():
    send_mail = EmailOperator(
    task_id='send_mail',
    to='spidywebpjh@gmail.com',
    subject='airflow EmailOperator를 통한 메일 발송 테스트',
    html_content=
    f'''
    Airflow task 실패!
    '''
    )

# on_failure_callback= 와 on_success_callback= 에 정의한 함수를 사용
with DAG(
    'airflow-emailoperator-test',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='*/10 * * * *',
    catchup=False,
    tags=['airflow-emailoperator-test'],
    is_paused_upon_creation=False,
    on_failure_callback=get_email_fail,
    on_success_callback=get_email_success
) as dag:
    test_task = BashOperator(
    task_id='test_task',
    bash_command=cmd,
    )

    

#의존관계 구성
    test_task

 

분기처리를 통한 담당자에게 메일 보내기

# email을 보내는 AlertUtils.py 파일에 아래와 같은 함수를 정의

def email_alert(context,email=""):
    if email == "팀1":
        email_recipients = ["1","2","3"]
    elif email == "팀2":
        email_recipients = ["4","5","6","7","8","9","10"]
    elif email == "test":
        email_recipients = ["1"]
    else:
        email_recipients = ["1","2","3"]
        if email.find(',') == -1:
            email_recipients.append(email) # 전달인자로 email이 들어오게 되면 위에서 정의한 1,2,3외에 건네준 email까지 메일송신
        else:
            email_recipients.extend(email.split(',')) # 전달인자로 여러개의 email을 ,를 기준으로 입력한다면 1,2,3외에 건네준 email들에게 추가로 메일 송신

# ... 이하 메일보내는 함수 및 DAG정의

 

위와 같이 email 매개변수를 받아서 이메일 수신자를 다르게 처리하는 로직을 함수로 구현합니다.

그리고는 DAG에 사용되는 DEFAULT_ARGS의 on_failure_callback 부분에 lambda를 통해 실패(성공)시 email을 전송하는 함수를 전달인자(팀)와 함께 실행시킵니다.

 

또한 위의 comment에도 언급 했듯이, 전달인자로 email혹은 email들을 건네준다면 추가적으로 메일을 송신할 수 있게끔 구성해두었습니다.

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'on_failure_callback': lambda context: AlertUtils.failure_callback(context, "팀1")
}

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'on_failure_callback': lambda context: AlertUtils.failure_callback(context, "팀2")
}

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'on_failure_callback': lambda context: AlertUtils.failure_callback(context, "test")
}

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'on_failure_callback': lambda context: AlertUtils.failure_callback(context, "spidyweb@gmail.com")
}

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'on_failure_callback': lambda context: AlertUtils.failure_callback(context, "spidyweb@gmail.com,spidyweb2@gmail.com")
}

with DAG(
        dag_id='email-alert',
        default_args=DEFAULT_ARGS,
...
) as dag:

Slack

1. slack채널 생성 및 설정

https://api.slack.com 접속 > Your apps > Create your first app

From scratch > 앱이름(소문자와 '-'로 구성) 과 슬랙 워크스페이스 선택 > Create App

Incoming Webhooks 선택 > On > Add New Webhook to Workspace > 채널 선택 > 허용(ON)

 

webhook URL 확인

2. Airflow Connection 생성

password에 webhook URL을 기입

3. slack.py, DAGs 생성

slack.py 생성

import os
import sys
import json
import requests

from airflow.hooks.base import BaseHook
from airflow.providers.http.hooks.http import HttpHook


def task_fail_slack_alert(context):
    slack_url=BaseHook.get_connection('slack_connection').get_password()

    message = (
        ':red_circle: Task Failed. \n'
        + '*Dag*: {} \n'
        + '*Task*: {} \n'
        + '*Execution Time*: {} \n'
        + '*Log Url *: {} \n'
    ).format(context.get('task_instance').dag_id, context.get('task_instance').task_id, context.get('execution_date'), context.get('task_instance').log_url)
    print(message)

    payload={'channel' : 'airflow', 'text' : message, 'username' : 'airflow', 'icon_emoji' : 'false'}
    requests.post(slack_url, data=json.dumps(payload), headers={'Content-type': 'application/json'})


def task_success_slack_alert(context):
    slack_url=BaseHook.get_connection('slack_connection').get_password()

    message = (
        ':large_green_circle: Task Success. \n'
        + '*Dag*: {} \n'
        + '*Task*: {} \n'
        + '*Execution Time*: {} \n'
        + '*Log Url *: {} \n'
    ).format(context.get('task_instance').dag_id, context.get('task_instance').task_id, context.get('execution_date'), context.get('task_instance').log_url)
    print(message)

    payload={'channel' : 'airflow', 'text' : message, 'username' : 'airflow', 'icon_emoji' : 'false'}
    requests.post(slack_url, data=json.dumps(payload), headers={'Content-type': 'application/json'})

slack-test.py dag 생성

from airflow import DAG
import slacktools.slack as slack
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta


cmd="echo 'test succeed'"

default_args = {
    'owner': 'spidyweb',
    'retries': 0,
    'retry_delay': timedelta(seconds=20),
    'depends_on_past': False
}

with DAG(
    'airflow-slack-test',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='*/10 * * * *',
    catchup=False,
    tags=['airflow-slack-test'],
    is_paused_upon_creation=False,
	on_failure_callback=slack.task_fail_slack_alert,# DAGs 실패 시 슬랙에 알람
    on_success_callback=slack.task_success_slack_alert,# DAGs 성공 시 슬랙에 알람
) as dag:
    test_task = BashOperator(
    task_id='test_task',
    bash_command=cmd,
    )
    
	test_task2 = BashOperator(
    task_id='test_task2',
    bash_command='sleep 10',
    )
    
	
#의존관계 구성
    test_task >> test_task2

 

 

Success

Failure

 

4. 특정 task 이후까지의 상세 결과 알림 받기

받을 수 있는 상세 결과 종류로는

테이블 명, 특정 변수명, batch 결과 row 수, crawler명 등 다양한 것을 받을 수 있겠습니다.

from airflow import DAG
import slacktools.slack as slack
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from datetime import datetime as dt
from airflow.hooks.base import BaseHook
import pendulum
import os
import sys
import json
import requests

local_tz=pendulum.timezone("Asia/Seoul")
today = dt.now(local_tz)

cmd="echo 'test succeed'"

default_args = {
    'owner': 'spidyweb',
    'retries': 0,
    'retry_delay': timedelta(seconds=20),
    'depends_on_past': False
}
def ETL_success_slack_alert(**kwargs):
    slack_url=BaseHook.get_connection('slack_connection').get_password()
    dag_id = kwargs["task_instance"].dag_id
	execution_date = kwargs["execution_date"]
    job_start_dtm = dt.strptime(dt.strftime(dt.strptime(str(execution_date), "%Y-%m-%dT%H:%M:%S.%f%z").astimezone(local_tz),"%Y-%m-%d %H:%M:%S%z"),"%Y-%m-%d %H:%M:%S%z")
    job_end_dtm = dt.strptime(dt.strftime(today,"%Y-%m-%d %H:%M:%S%z"),"%Y-%m-%d %H:%M:%S%z")
    total_sec = int((job_end_dtm - job_start_dtm).total_seconds())
    
    # 추가로 추출하는 row건수, 관련된 task_id, 작업 테이블 명, crawler 명 등등 넣으면 된다.
    # airflow 2.4.0 에서 execution_date는 The deprecated variable
    # job_start_dtm = dt.strptime(dt.strftime(execution_date, "%Y-%m-%dT%H:%M:%S%z"),"%Y-%m-%dT%H:%M:%S%z").astimezone(local_tz) 2.1.3 version
    # job_end_dtm = dt.strptime(dt.strftime(today,"%Y-%m-%d %H:%M:%S%z"),"%Y-%m-%d %H:%M:%S%z") 2.1.3 version
    # total_sec = int((job_end_dtm - job_start_dtm).total_seconds())

    message = (
        f'''
--------------------------------------------------------------------------
:large_green_circle: *JOB상태*: Succeed\n
:black_small_square: *Dag*: {dag_id}\n
:black_small_square: *Execution Date*: {execution_date}\n
:black_small_square: *시작일시*: {job_start_dtm}\n
:black_small_square: *종료일시*: {job_end_dtm}\n
:black_small_square: *소요시간(Sec)*: {total_sec} Secs\n
            '''
    )
    payload={'channel' : 'airflow', 'text' : message, 'username' : 'airflow', 'icon_emoji' : 'false'} # 채널 airflow로 변경해야 함
    requests.post(slack_url, data=json.dumps(payload), headers={'Content-type': 'application/json'})


with DAG(
    'airflow-slack-result',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='*/10 * * * *',
    catchup=False,
    tags=['airflow-slack-result'],
    is_paused_upon_creation=False,
	on_failure_callback=slack.task_fail_slack_alert,
    # on_success_callback=slack.task_success_slack_alert,# 특정 task까지 성공 후에 slack알람을 받기 위해 주석처리
) as dag:
    test_task = BashOperator(
        task_id='test_task',
        bash_command=cmd,
    )
    
	test_task2 = BashOperator(
        task_id='test_task2',
        bash_command='sleep 20',
    )

    # ETL 상세 결과 slack에 전송
    result_to_slack = PythonOperator(
        task_id='result_to_slack',
        python_callable=ETL_success_slack_alert,
        provide_context=True,
        trigger_rule="all_success"
    )
	
#의존관계 구성 result_to_slack 뒤에 email 전송 혹은 다른 작업이 있다고 가정 하에, 실제 batch작업 시간만을 측정하기 위해 slack 알림을 task로써 구성
    test_task >> test_task2 >> result_to_slack

결과


여기까지 Email 및 Slack을 이용한 알림 받기를 포스팅해봤는데, 블로그에 코드를 포스트하면서 indent가 제대로 맞지 않게 기입되는 경우가 있어서, python code같은 경우 indent가 중요하기 때문에 코드를 활용 하시려면 indent 잘 맞춰주시기 바랍니다.

 

2024-02-26 - 분기처리 통한 담당자에게 메일보내기 수정

728x90