[Airflow] TO-BE Batch job 프로세스 개선 - 1) Airflow on k8s 이전(AWS EKS)
이번 포스트에는 NCP 플랫폼에서 새로 구축해본 airflow on k8s 경험을 토대로 저희 주된 배치를 관리하는 AWS 플랫폼 위에서의 Airflow 이전 경험을 공유드리려고 합니다.
기존에는 airflow 환경은 EC2한대에 올라가 있었고, IAC로 airflow가 관리되고 있지도 않았고, image에 대해서도 형상관리가 되지 않았습니다. 또한 서버에 대한 확장성이 모잘라서, SIGTERM 에러도 많이 발생 했었습니다.
포스트는 총 3개로 나뉘어 진행되며, 이번 포스트가 1번째 포스트입니다.
1. [Airflow] TO-BE Batch job 프로세스 개선 - 1) Airflow on k8s 이전 (AWS EKS)
https://spidyweb.tistory.com/543
2. [Airflow] TO-BE Batch job 프로세스 개선 - 2) 거버넌스, 표준, 형상 관리, 자동화, 프로세스 단축
https://spidyweb.tistory.com/544
3. [Airflow] TO-BE Batch job 프로세스 개선 - 3) DAG 이전 및 이슈 정리
https://spidyweb.tistory.com/545
1. AS-IS 구조
- EC2 단일 서버에 모든 것을 구성
- Airflow webserver
- Airflow Scheduler
- Airflow Logs(local에 파일로 저장)
- Airflow Metadatabase(PostgreSQL)
- DAG folder에서 python file 인식
AS-IS 신규 DAG 개발 프로세스
1. DAGs 개발 & commit & push
2. DAGs repository인 git lab에 저장
3. Jenkins를 통해 gitlab python file ec2(airflow server)와 동기화
4. Server 내 지정된 DAG경로의 Python file을 인식하여 DAGs를 UI 및 Scheduling에 적용
AS-IS Mart 생성 프로세스
1. procedure, SQL파일, python 내 SQL 등 Mart 생성 로직 통일화되지 않음
2. AWS Redshift내에서만 Procedure가 저장되어 형상관리가 되지 않음
3. Airflow 내에서 PythonOperator + boto3로 Procedure를 call하는 배치관리가 다수 존재
2. TO-BE 구조
Airflow 구성 요소(변경점)
- images
- jenkins 통해 빌드
- DAGs 개발 & 배포
- IDE 사용
- git lab & git Sync 동기화
- jenkins통한 배포 제거
- Logging
- S3에 log file 적재 및 참조
- Meta Database
- EC2 server에 있던 local DB → RDS
- eks secret 관리
- AWS Secrets Manager
- airflow 구성
- webserver pod
- scheduler pod
- git sync container 포함
- triggerer
- statsd pod
- metric 전송
- Dynamic worker pod
- DAGs 에 속한 task 당 하나의 pod가 동적으로 실행
- monitoring
- Prometheus
- Grafana
Airflow on k8s(TO-BE) 장점
- 확장성이 좋다
→ 쿠버네티스의 특징상 자원이 부족한 경우 확장을 쉽게 할 수 있음 - MetaDatabase 분리
→ monolothic이던 airflow 환경을 각각 분리하여, 어느 한 곳에서 에러가 나도 영향 가지 않게 분리 - jenkins를 통한 DAG 배포 과정 생략
→ gitsync를 통해 commit & push만 되어도 동기화 및 적용 가능 - logging의 원격 저장
→ s3에 logging을 저장함으로써 log를 봐야 할 때 접근하기가 용이 - secret 관리
→ 중요 정보를 secret을 통해서 보안 유지 가능 - monitoring + metric
→ Prometheus & Grafana를 통해 Airflow metric 모니터링 가능 - 이미지 형상관리 및 IAC(multi tenant)
→ 이미지 및 IAC를 통한 여러 부서에 독립적인 Airflow 제공 가능
1) Meta Database
airflow version 도 2.1.3 → 2.7.1 로 옮겨지므로 새로운 metadata table이 추가 되었음 password, extra field처럼 암호화가 된 컬럼에 대해서는 fernet을 통해 암호화하므로, AS-IS, TO-BE 동일한 fernet key가 설정 되어야 함
fernet key가 다를 경우 생기는 문제
테이블을 전부 옮길 필요 없이 메타데이터베이스 중 아래에 해당하는 것만 옮기면 됐다.
- connection
- ab_role
- ab_user
- ab_user_role
- variable
RDS에 이전할 때 한글데이터가 입력되지 않는 이슈가 있었는데, RDS에 기본적으로 설정된 값 때문이였음.
서버에서 default로 character_set_server를 latin1로 잡고 있기 때문에, 한글데이터가 입력이 되지 않는 문제 발생
CHARSET: utf8mb4 COLLATE: utf8mb4_general_ci
2-1) S3(remote logging)
connection
aws의 access, secret key와 region 정보를 넣는다.
values-prd.yaml
logging: {remote_logging: 'True', colored_console_log: 'False', remote_base_log_folder: 's3위치',
remote_log_conn_id: aws_dev_default, logging_level: INFO}
2-2) PV(persistence volume)로 logging
- storageclass와 PVC필요
- PVC는 storageclass로부터 생성되는 airflow helm에서 정의한 기본 pvc로 생성 됨(airflow-logs)
- PV 또한 PVC에 정의된 것에 맞게 생성되고 바인딩 됨
# values.yaml 내부의 logs 지정부분 아래와 같이 설정한 경우 storageClass에 의해 PVC가 생성되고 PV가 매핑 됨
logs:
persistence:
enabled: true
size: 10Gi
annotations: null
storageClassName: eks-nas-csi
existingClaim: null
3) Secret(AWS Secrets Manager)
External Secrets Operator (ESO)
🔑 External Secrets Controller — 외부 API에서 시크릿을 가져와 Kubernetes 시크릿을 생성하는 Kubernetes 컨트롤러입니다. 외부 API에서 시크릿이 변경되면 컨트롤러가 클러스터의 상태를 조율하고 시크릿을 업데이트합니다.
🔑 ExternalSecret — 시크릿 데이터를 가져올지를 지정하는 사용자 정의 리소스 정의입니다. 이는 데이터에 액세스하는 방법을 알고 있는 SecretStore를 참조합니다. 컨트롤러는 ExternalSecret를 시크릿을 생성하는 청사진으로 사용합니다.
🔑 SecretStore — 외부 API에서 시크릿을 가져오기 위해 필요한 액세스를 지정하는 사용자 정의 리소스 정의입니다. SecretStore는 인증 및 액세스를 처리합니다.
두 가지 종류의 SecretStore 리소스가 있습니다:
🔑 ClusterSecretStore — 모든 네임스페이스에서 참조할 수 있는 전역 클러스터용 SecretStore입니다. 시크릿 공급자에 대한 중앙 게이트웨이를 제공하기 위해 사용할 수 있습니다.
🔑 SecretStore — 단일 네임스페이스에서만 참조할 수 있는 네임스페이스용 SecretStore입니다.
Install External Secrets Operator using Helm
helm repo add external-secrets https://charts.external-secrets.io
helm repo update
Install ESO in external-secrets namespace
helm upgrade --namespace external-secrets --create-namespace --install --wait external-secrets external-secrets/external-secrets
Verify ESO installation
kubectl -n external-secrets get all
Create an IAM user and attach the managed policy
aws iam create-user --user-name external-secrets
aws iam attach-user-policy --user-name external-secrets --policy-arn arn:aws:iam::aws:policy/SecretsManagerReadWrite
aws iam create-access-key --user-name external-secrets
{
"AccessKey": {
"UserName": "external-secrets",
"AccessKeyId": "엑세스키",
"Status": "Active",
"SecretAccessKey": "시크릿키",
"CreateDate": "2024-05-28T02:00:41+00:00"
}
}
access key , secret key file로 관리
echo -n "엑세스키" > access-key
echo -n "시크릿키" > secret-access-key
kubectl -n default create secret generic awssm-secret --from-file=./access-key --from-file=./secret-access-key
secret manager secret 생성
✅ 어떤 git_sync_username을 사용해야 되는가?
→
젠킨스 ID/PW
일반 secret으로 하면 성공, external secret을 통한 secret생성은 failed
→ aws secret manager는 알아서 base64로 인코딩 되기 때문에 2번 인코딩 되고 있던 문제 발생
aws secretsmanager create-secret --name git-credentials --secret-string '{"GIT_SYNC_USERNAME":"이름","GIT_SYNC_PASSWORD":"비밀번호"}' --region ap-northeast-2
Create a cluster-scoped secret store
apiVersion: external-secrets.io/v1beta1
kind: ClusterSecretStore
metadata:
name: global-secret-store
spec:
provider:
aws:
service: SecretsManager
region: ap-northeast-2
auth:
secretRef:
accessKeyIDSecretRef:
name: awssm-secret
key: access-key
namespace: default
secretAccessKeySecretRef:
name: awssm-secret
key: secret-access-key
namespace: default
Create ExternalSecret to fetch the secret data
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: git-credentials
namespace: airflow
spec:
refreshInterval: 1m
secretStoreRef:
name: global-secret-store
kind: ClusterSecretStore
target:
name: git-credentials
creationPolicy: Owner
dataFrom:
- extract:
key: git-credentials
4) Airflow Image build(library 추가)
1. tools
- Jenkins
- AWS ECR(Airflow image repo)
2. AWS ECR push를 위한 Secret 생성
https://kubernetes.io/ko/docs/tasks/configure-pod-container/pull-image-private-registry/
AWS Secret manager에 cli를 통해 값을 생성 할 때는 \”처럼해야 “가 인식됨
3. ExternalSecret 생성
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: registry-auth
namespace: airflow
spec:
refreshInterval: 1m
secretStoreRef:
name: global-secret-store
kind: ClusterSecretStore
target:
name: registry-auth
creationPolicy: Owner
dataFrom:
- extract:
key: registry-auth
4. jenkins
jenkins와 devops.jenkins파일 생성 및 빌드
⚠️ org.jenkinsci.plugins.scriptsecurity.sandbox.RejectedAccessException: Scripts not permitted to use method org.yaml.snakeyaml.Yaml load java.lang.String
등등 yaml관련 snakeyaml 사용 시 빌드가 허용되지 않는 문제 발생 →
jenkins 관리 → in-process Script approval → Signatures already approved: 아래 사진과 같이 추가
5. values-prd.yaml
images:
airflow: {repository: repo 주소, tag: 태그, digest: null, pullPolicy: IfNotPresent}
useDefaultImageForMigration: false
...
registry:
secretName: registry-auth
connection: {}
6. requirements.txt
필요 library 목록
pymysql
pendulum
suds
...
5) airflow webserver + ingress(LB)
dns와 ingress를 붙이지 않고, 임시로 사용하려면 portforwarding을 하면 된다.
임시 사용(port forwarding)
- kubectl port-forward svc/airflow-webserver 18080:8080 -n airflow --address=0.0.0.0
- NodePort이니까, sg열고 node ip: 18080으로 접근
DNS 매핑(+ loadbalancer)
route53을 통해 생성한 DNS(CNAME or A레코드)에 loadbalancer를 매핑해주면 됨
⚠️ backend service does not exist 뜸
원인:
kubectl describe ingress airflow-webserver-ingress -n airflow를 통해 확인해 본 결과
IAM 권한 중 eksctl-prd-eks-cluster-addon-iamserviceaccou-Role1에 대해서 AddTag 권한이 없다고 나옴
→ 확인 해보니 policy 자체는 있는데, condition이 문제
ingress Failed deploy model due to AccessDenied:is not authorized to perform: elasticloadbalancing:AddTags on resource: because no identity-based policy allows the elasticloadbalancing:AddTags action status code: 403,
아래 JSON에서 "aws:RequestTag/elbv2.k8s.aws/cluster": “true", → “false”로 전환
{ "Effect": "Allow", "Action": [ "elasticloadbalancing:AddTags", "elasticloadbalancing:RemoveTags" ], "Resource": [ "arn:aws:elasticloadbalancing:*:*:targetgroup/*/*", "arn:aws:elasticloadbalancing:*:*:loadbalancer/net/*/*", "arn:aws:elasticloadbalancing:*:*:loadbalancer/app/*/*" ], "Condition": { "Null": { "aws:RequestTag/elbv2.k8s.aws/cluster": "false", "aws:ResourceTag/elbv2.k8s.aws/cluster": "false" } } },
→ 이전의 503에러가 나왔던 airflow쪽 listner rule이 정상적으로 돌아옴 + target group 생성 됨 + ingress describe해보면 정상적으로 나옴
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: airflow-webserver-ingress
namespace: airflow
annotations:
alb.ingress.kubernetes.io/load-balancer-name: alb
alb.ingress.kubernetes.io/subnets: subnet
alb.ingress.kubernetes.io/listen-ports: '[{"HTTPS": 443}]'
alb.ingress.kubernetes.io/certificate-arn: cert
alb.ingress.kubernetes.io/healthcheck-path: /health
alb.ingress.kubernetes.io/group.name: eks-private-management
spec:
ingressClassName: alb
rules:
- host: DNS주소
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: airflow-webserver
port:
number: 8080
6) monitoring (prometheus & grafana)
airflow를 구성하는 pod에 대한 memory & cpu사용률을 확인하고 싶으면 아래의 명령어를 사용
kubectl top pods -n airflow
prometheus & grafana (airflow metrics)
fluentbit를 k8s에 설치하고 설정하게 되면,
airflow statsd의 metric을 kubernetes-service-endpoints job이 수집하게 된다.
(설치하지 않더라도, statsd를 통해 prometheus로 metric을 전송할 수 있음.)
7) values.yaml 필요한 설정 + Dockerfile구성
Dockerfile
FROM apache/airflow:2.7.1
# 필요한 추가 라이브러리들을 설치합니다.
COPY requirements.txt .
RUN pip install --no-cache-dir "apache-airflow==2.7.1" -r requirements.txt
values.yaml
- executor
executor: KubernetesExecutor
- image
images:
airflow: {repository: 이미지주소,
tag: 태그, digest: null, pullPolicy: IfNotPresent}
useDefaultImageForMigration: false
- affinity
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values: [노드이름]
- metadatabase + fernet key
data:
metadataSecretName: null
resultBackendSecretName: null
brokerUrlSecretName: null
metadataConnection: {user: 유저, pass: 비밀번호, protocol: mysql(postgresql), host: 호스트,
port: 포트, db: airflow, sslmode: disable}
resultBackendConnection: null
brokerUrl: null
fernetKey: fernetkey
fernetKeySecretName: null
webserverSecretKey: secretkey
webserverSecretKeySecretName: null
- logging
config:
core: {dags_folder: '{{ include "airflow_dags" . }}', load_examples: 'False', executor: '{{
.Values.executor }}', colored_console_log: 'False', remote_logging: '{{- ternary
"True" "False" .Values.elasticsearch.enabled }}'}
logging: {remote_logging: 'True', colored_console_log: 'False', remote_base_log_folder: 's3uri',
remote_log_conn_id: aws_default, logging_level: INFO}
- gitsync
dags:
persistence:
annotations: {}
enabled: false
size: 1Gi
storageClassName: null
accessMode: ReadWriteOnce
existingClaim: null
subPath: null
gitSync:
enabled: true
repo: dag repo 주소
branch: prd
rev: HEAD
depth: 1
maxFailures: 0
subPath: ''
credentialsSecret: git-credentials
wait: 20
containerName: git-sync
uid: 65533
securityContext: {}
securityContexts:
container: {}
extraVolumeMounts: []
env: []
resources: {}
이상으로 AS-IS 와 TO-BE의 Airflow 환경에 대한 변경점을 정리해보았습니다.
언젠가 따로 executor별 차이점을 정리해야 겠다고 생각은 들지만,
KubernetesExecutor를 사용할 때의 장점을 요약하면, kubernetes 스럽게 airflow를 사용할 수 있다는 것 입니다.
worker가 평소에는 상주하지 않다가, task가 실행되는 경우, pod로써 비로소 올라오게 되는 특징이 있어, 자원을 효율적으로 사용할 수 있습니다.
단점으로는 pod가 뜨는데 시간이 걸리기 때문에, 약간의 지연이 발생할 수 있습니다.
참조:
https://www.giantswarm.io/blog/manage-kubernetes-secrets-using-aws-secrets-manager
https://kubernetes.io/ko/docs/tasks/configure-pod-container/pull-image-private-registry/