BigData/Apache Airflow

[Airflow] TO-BE Batch job 프로세스 개선 - 3) DAG 이전 및 이슈 정리

스파이디웹 2024. 6. 22. 12:25
728x90

포스트는 총 3개로 나뉘어 진행되며, 이번 포스트가 3번째 포스트입니다.

1. [Airflow] TO-BE Batch job 프로세스 개선 - 1) Airflow on k8s 이전 (AWS EKS)

https://spidyweb.tistory.com/543

 

2. [Airflow] TO-BE Batch job 프로세스 개선 - 2) 거버넌스, 표준, 형상 관리, 자동화, 프로세스 단축

https://spidyweb.tistory.com/544

 

3. [Airflow] TO-BE Batch job 프로세스 개선 - 3) DAG 이전 및 이슈 정리

https://spidyweb.tistory.com/545


1. DAG 이전 경험 공유

  • 기존에 존재하던 다양한 명칭 → 표준화된 규칙에의해 .py , DAG 명 일괄 변경 및 생성에 적용, 배치주기를 파일 이름보고 파악할 수 있게 수정
  • DAG 프로젝트에 각 platform별 폴더 구조화, utils, sql, template,image등 폴더 구분
  • 변경 이력 주석으로 추가 하게 모든 template에 적용(대충 아래와 같은 템플릿으로)
#################################
# 작성 일자 # 작성자 # 변경사항 #
#################################
  • 하나의 의미 없던 owner → 각 생성자의 email 주소를 달아서 담당자 매핑
  • 각 dag에 description을 달아서 어떠한 job인지를 간략하게 표현
  • 기존에 있었던 프로시저 실행 → SQL파일 + PostgresOperator를 통해 코드가 형상관리 될 수 있고, 세분화된 task로 어디서 장애가 났는지 파악할 수 있게 변경(task의 세분화를 통한 작업 모니터링 세분화)
  • 다양한 코드 작성 스타일 → 포메팅 및, 신규 배치 작업 건에 대한 자동 생성 지원
    • code formatter black을 commit 이전에 실행될 수 있게 강제하여 동일한 코드포멧을 사용할 수 있게 함
    • dag_factory를 통해 사용자가 원하는 값을 입력하고, 정해진 template에 따라 dag파일을 생성 할 수 있게 함

2. DAG 이전 + TO-BE로 넘어가면서 생긴 여러 이슈 정리

1) Version Up에 따른 library 명칭 변경

AS-IS TO-BE
from airflow.providers.amazon.aws.operators.glue_crawler import AWSGlueCrawlerOperator from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator from airflow.providers.amazon.aws.operators.athena import AthenaOperator

 

2) PostgresOperator 에러

PostgresOperator를 통해 Redshift에 SQL쿼리를 날리고 있는데, airflow on k8s로 이전하고 부터 에러가 발생


⚠️ You are trying to use common-sql with AwsGenericHook, but its provider does not support it. Please upgrade the provider to a version that supports common-sql. The hook class should be a subclass of airflow.providers.common.sql.hooks.sql.DbApiHook
→ AWSGenericHook이라는게 Amazon Web Services connection Type을 쓰기에 생기는 에러

AS-IS
TO-BE

3) SSHOperator timed out

"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 158, in run_ssh_client_command exit_status, agg_stdout, agg_stderr = self.ssh_hook.exec_ssh_client_command( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 546, in exec_ssh_client_command raise AirflowException("SSH command timed out") airflow.exceptions.AirflowException: SSH command timed out

→ airflow on k8s에 아래와 같이 cmd time out을 None으로 줘야 함(ssh가 shell파일을 실행시키고 오랫동안 성공 여부를 체크하는 경우) , k8s위에서의 문제인지 airflow version up의 문제인지는 파악되지 않음

cmd_timeout=None,

4) EMRServerless 권한에러

EMR Serverless 제출 시 emr-serverlessTagResource 권한 에러 발생

is not authorized to perform: emr-serverless:TagResource

→ EMRServerlessStudioAccessPolicy 추가

 

prd-iam-eks-nodegroup에 권한 추가 시 PassRole 에러 발생

is not authorized to perform: iam:PassRole on resource:

→ PassRole 추가

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "ec2:CreateNetworkInterface",
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": {
                "StringEquals": {
                    "aws:CalledViaLast": "ops.emr-serverless.amazonaws.com"
                }
            }
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:CreateStudioPresignedUrl",
                "sso:DeleteManagedApplicationInstance",
                "emr-serverless:*",
                "elasticmapreduce:DescribeStudio",
                "elasticmapreduce:CreateStudio",
                "elasticmapreduce:DeleteStudio",
                "elasticmapreduce:ListStudios"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": [
                "iam:PassRole",
                "iam:CreateServiceLinkedRole"
            ],
            "Resource": "arn:aws:iam::618467231866:role/*"
        }
    ]
}

 

5) SSHOperator 방화벽

각 서버에 날리는 SSHOperator 명령어들이 방화벽에 의해 connection time out이 나옴

 

EKS worker node들이 속한 subnet에 대해서 SSHOperator대상이 되는 서버들의 inbound에 추가가 필요, 그렇지 않으면 connection time out이 나옴

 

이로써 AWS EKS위의 Airflow 이전 및 전체적인 ETL프로세스, 파이프라인 관리 개선에 대한 경험 공유 포스팅을 마치며, 수정이 필요할 때 다시 수정하겠습니다. 감사합니다.

728x90