[Airflow] TO-BE Batch job 프로세스 개선 - 3) DAG 이전 및 이슈 정리
포스트는 총 3개로 나뉘어 진행되며, 이번 포스트가 3번째 포스트입니다.
1. [Airflow] TO-BE Batch job 프로세스 개선 - 1) Airflow on k8s 이전 (AWS EKS)
https://spidyweb.tistory.com/543
2. [Airflow] TO-BE Batch job 프로세스 개선 - 2) 거버넌스, 표준, 형상 관리, 자동화, 프로세스 단축
https://spidyweb.tistory.com/544
3. [Airflow] TO-BE Batch job 프로세스 개선 - 3) DAG 이전 및 이슈 정리
https://spidyweb.tistory.com/545
1. DAG 이전 경험 공유
- 기존에 존재하던 다양한 명칭 → 표준화된 규칙에의해 .py , DAG 명 일괄 변경 및 생성에 적용, 배치주기를 파일 이름보고 파악할 수 있게 수정
- DAG 프로젝트에 각 platform별 폴더 구조화, utils, sql, template,image등 폴더 구분
- 변경 이력 주석으로 추가 하게 모든 template에 적용(대충 아래와 같은 템플릿으로)
#################################
# 작성 일자 # 작성자 # 변경사항 #
#################################
- 하나의 의미 없던 owner → 각 생성자의 email 주소를 달아서 담당자 매핑
- 각 dag에 description을 달아서 어떠한 job인지를 간략하게 표현
- 기존에 있었던 프로시저 실행 → SQL파일 + PostgresOperator를 통해 코드가 형상관리 될 수 있고, 세분화된 task로 어디서 장애가 났는지 파악할 수 있게 변경(task의 세분화를 통한 작업 모니터링 세분화)
- 다양한 코드 작성 스타일 → 포메팅 및, 신규 배치 작업 건에 대한 자동 생성 지원
- code formatter black을 commit 이전에 실행될 수 있게 강제하여 동일한 코드포멧을 사용할 수 있게 함
- dag_factory를 통해 사용자가 원하는 값을 입력하고, 정해진 template에 따라 dag파일을 생성 할 수 있게 함
2. DAG 이전 + TO-BE로 넘어가면서 생긴 여러 이슈 정리
1) Version Up에 따른 library 명칭 변경
AS-IS | TO-BE |
from airflow.providers.amazon.aws.operators.glue_crawler import AWSGlueCrawlerOperator | from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator |
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator | from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator |
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator | from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator |
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor | from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor |
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator | from airflow.providers.amazon.aws.operators.athena import AthenaOperator |
2) PostgresOperator 에러
PostgresOperator를 통해 Redshift에 SQL쿼리를 날리고 있는데, airflow on k8s로 이전하고 부터 에러가 발생
⚠️ You are trying to use common-sql with AwsGenericHook, but its provider does not support it. Please upgrade the provider to a version that supports common-sql. The hook class should be a subclass of airflow.providers.common.sql.hooks.sql.DbApiHook
→ AWSGenericHook이라는게 Amazon Web Services connection Type을 쓰기에 생기는 에러
3) SSHOperator timed out
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 158, in run_ssh_client_command exit_status, agg_stdout, agg_stderr = self.ssh_hook.exec_ssh_client_command( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 546, in exec_ssh_client_command raise AirflowException("SSH command timed out") airflow.exceptions.AirflowException: SSH command timed out
→ airflow on k8s에 아래와 같이 cmd time out을 None으로 줘야 함(ssh가 shell파일을 실행시키고 오랫동안 성공 여부를 체크하는 경우) , k8s위에서의 문제인지 airflow version up의 문제인지는 파악되지 않음
cmd_timeout=None,
4) EMRServerless 권한에러
EMR Serverless 제출 시 emr-serverlessTagResource 권한 에러 발생
is not authorized to perform: emr-serverless:TagResource
→ EMRServerlessStudioAccessPolicy 추가
prd-iam-eks-nodegroup에 권한 추가 시 PassRole 에러 발생
is not authorized to perform: iam:PassRole on resource:
→ PassRole 추가
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "ec2:CreateNetworkInterface",
"Resource": "arn:aws:ec2:*:*:network-interface/*",
"Condition": {
"StringEquals": {
"aws:CalledViaLast": "ops.emr-serverless.amazonaws.com"
}
}
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"elasticmapreduce:CreateStudioPresignedUrl",
"sso:DeleteManagedApplicationInstance",
"emr-serverless:*",
"elasticmapreduce:DescribeStudio",
"elasticmapreduce:CreateStudio",
"elasticmapreduce:DeleteStudio",
"elasticmapreduce:ListStudios"
],
"Resource": "*"
},
{
"Sid": "VisualEditor2",
"Effect": "Allow",
"Action": [
"iam:PassRole",
"iam:CreateServiceLinkedRole"
],
"Resource": "arn:aws:iam::618467231866:role/*"
}
]
}
5) SSHOperator 방화벽
각 서버에 날리는 SSHOperator 명령어들이 방화벽에 의해 connection time out이 나옴
→
EKS worker node들이 속한 subnet에 대해서 SSHOperator대상이 되는 서버들의 inbound에 추가가 필요, 그렇지 않으면 connection time out이 나옴
이로써 AWS EKS위의 Airflow 이전 및 전체적인 ETL프로세스, 파이프라인 관리 개선에 대한 경험 공유 포스팅을 마치며, 수정이 필요할 때 다시 수정하겠습니다. 감사합니다.