카테고리 없음
[Airflow] KubernetesPodOperator 사용법 정리
스파이디웹
2024. 4. 27. 13:10
728x90
1. KubernetesPodOperator란?
- Kubernetes 위에 정의한 자원과 빌드한 이미지를 Pod를 생성해 실행 시킬 수 있는 Airflow Operator
- CeleryExecutor, KubernetesExecutor 모두 가능하다(kubernetes cluster만 있다면)
2. Pod 설정 항목
- private image 저장소 URL
etl_image_url = "이미지 저장소 URL"
# task에 아래와 같이 사용
image=etl_image_url, # task 내 image property에 private image repository url을 입력
- image pull secrets
pod에 배포할 이미지를 가져올 때 secret 지정
from kubernetes.client import models as k8s
image_pull_secrets=[k8s.V1LocalObjectReference('registry-auth')],
# private repo에서 image를 pull하기 위해서는 미리 정의된 secret이 필요하다.
# https://kubernetes.io/ko/docs/tasks/configure-pod-container/pull-image-private-registry/
- image pull policy
이미지를 캐싱할지 항상 pull해올지 지정
image_pull_policy="Always",
# IfNotPresent: If you set the imagePullPolicy to IfNotPresent, Kubernetes will only pull the image when it doesn’t already exist on the node.
# Always: With your imagePullPolicy set to Always, Kubernetes will always pull the latest version of the image from the container registry.
# Never: If you set the imagePullPolicy to Never, there will be no attempts to pull the image.
- Pod의 resource
생성될 pod의 resource를 지정
pod_resources = k8s.V1ResourceRequirements(
limits={"memory": "2Gi", "cpu": "2"},
requests={"memory": "1Gi", "cpu": "1"}
)
- affinity
affinity = k8s.V1Affinity(
node_affinity={
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [
{
'matchExpressions': [
{
'key': 'kubernetes.io/hostname',
'operator': 'In',
'values': [
'노드 이름',
]
}]
}
]
}
})
- in_cluster
Airflow가 Kubernetes 클러스터 내에서 실행 중인지 여부를 지정
→ k8s 클러스터에 대한 별도의 구성이나 인증 작업이 필요하지 않음
- get_logs
컨테이너의 표준출력을 로그로 가져올 건지 지정
get_logs=True,
3. 예제 코드
with DAG(
dag_id='k8spodoperator_test',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=two_days_ago,
schedule_interval=None,
catchup=False
) as dag:
etl_postgresql_M = KubernetesPodOperator(
name="k8spodoperator_test",
image=etl_image_url,# image='private_registry/image', #Placeholder for the actual URL
cmds=["python3", "test.py"],
image_pull_secrets=[k8s.V1LocalObjectReference('registry-auth')], # image pull secret 정의
labels={"foo": "bar"},
task_id="k8spodoperator_tes",
do_xcom_push=True,
in_cluster=True,
get_logs=True,
affinity=affinity, # affinity 정의
image_pull_policy="Always",
container_resources=pod_resources # 자원정의
)
4. 실행되는 방식
- KubernetesPodOperator를 실행 시키면, 지정한 pod 자원과 affinity로 pod를 실행 시켰는지 체크하는 pod와 실제로 지정한 자원과 affinity로 작업을 돌리게되는 pod가 각각 생성됨 → 2개의 pod가 뜨게 됨
- 체크하는 파드는 500m 512Mi로 자원이 세팅 되고, cluster affinity를 따라감
- 실제 작업하는 pod는 지정한 자원, affinity로 생성 됨
호출(체크)하는 pod의 자원
작업하는 pod의 자원
728x90