카테고리 없음

[Airflow] KubernetesPodOperator 사용법 정리

스파이디웹 2024. 4. 27. 13:10
728x90

1. KubernetesPodOperator란?

  • Kubernetes 위에 정의한 자원과 빌드한 이미지를 Pod를 생성해 실행 시킬 수 있는 Airflow Operator
  • CeleryExecutor, KubernetesExecutor 모두 가능하다(kubernetes cluster만 있다면)

2. Pod 설정 항목

  • private image 저장소 URL
etl_image_url = "이미지 저장소 URL"


# task에 아래와 같이 사용
image=etl_image_url, # task 내 image property에 private image repository url을 입력

 

  • image pull secrets

pod에 배포할 이미지를 가져올 때 secret 지정

from kubernetes.client import models as k8s
image_pull_secrets=[k8s.V1LocalObjectReference('registry-auth')],
# private repo에서 image를 pull하기 위해서는 미리 정의된 secret이 필요하다.
# https://kubernetes.io/ko/docs/tasks/configure-pod-container/pull-image-private-registry/
  • image pull policy

이미지를 캐싱할지 항상 pull해올지 지정

image_pull_policy="Always",
# IfNotPresent: If you set the imagePullPolicy to IfNotPresent, Kubernetes will only pull the image when it doesn’t already exist on the node.
# Always: With your imagePullPolicy set to Always, Kubernetes will always pull the latest version of the image from the container registry. 
# Never: If you set the imagePullPolicy to Never, there will be no attempts to pull the image.
  • Pod의 resource

생성될 pod의 resource를 지정

pod_resources = k8s.V1ResourceRequirements(
    limits={"memory": "2Gi", "cpu": "2"},
    requests={"memory": "1Gi", "cpu": "1"}
)
  • affinity
affinity = k8s.V1Affinity(
    node_affinity={
        'requiredDuringSchedulingIgnoredDuringExecution': {
            'nodeSelectorTerms': [
                {
                    'matchExpressions': [
                        {
                            'key': 'kubernetes.io/hostname',
                            'operator': 'In',
                            'values': [
                                '노드 이름',
                            ]
                        }]
                }
            ]
        }
    })
  • in_cluster

Airflow가 Kubernetes 클러스터 내에서 실행 중인지 여부를 지정

→ k8s 클러스터에 대한 별도의 구성이나 인증 작업이 필요하지 않음

  • get_logs

컨테이너의 표준출력을 로그로 가져올 건지 지정

get_logs=True,

3. 예제 코드

with DAG(
        dag_id='k8spodoperator_test',
        default_args=DEFAULT_ARGS,
        dagrun_timeout=timedelta(hours=2),
        start_date=two_days_ago,
        schedule_interval=None,
        catchup=False
) as dag:
    etl_postgresql_M = KubernetesPodOperator(
        name="k8spodoperator_test",
        image=etl_image_url,# image='private_registry/image', #Placeholder for the actual URL
        cmds=["python3", "test.py"],
        image_pull_secrets=[k8s.V1LocalObjectReference('registry-auth')], # image pull secret 정의
        labels={"foo": "bar"},
        task_id="k8spodoperator_tes",
        do_xcom_push=True,
        in_cluster=True,
        get_logs=True,
        affinity=affinity, # affinity 정의
        image_pull_policy="Always",
        container_resources=pod_resources # 자원정의
    )

4. 실행되는 방식

  • KubernetesPodOperator를 실행 시키면, 지정한 pod 자원과 affinity로 pod를 실행 시켰는지 체크하는 pod와 실제로 지정한 자원과 affinity로 작업을 돌리게되는 pod가 각각 생성됨 → 2개의 pod가 뜨게 됨
  • 체크하는 파드는 500m 512Mi로 자원이 세팅 되고, cluster affinity를 따라감
  • 실제 작업하는 pod는 지정한 자원, affinity로 생성 됨

호출(체크)하는 pod의 자원

작업하는 pod의 자원

728x90