본문 바로가기
BigData/Apache Airflow

[Airflow] EMR create + Step 제출(Spark job) + StepSensor Dag 구성하기(feat. ETL)

by 스파이디웹 2023. 7. 30.
728x90

아직까지도 많은 기업에서는 EMR을 원하는 시간대에 띄워서 batch job을 airflow schedule에 맞게 실행시키고 종료시키는 ETL 형태를 많이 사용하고 있습니다.

그래서 이번 포스트에는 Airflow로 EMR을 띄우고, spark job을 제출하고 job이 끝나는 대로 EMR을 종료시키는 DAG를 구성해보겠습니다.

 

Airflow는 미리 구성되어 있다고 가정하고 시작하겠습니다.

 

Airflow 구성부터 해보고 싶으시면 아래의 링크를 참조해주세요.

https://spidyweb.tistory.com/449

 

[Airflow] Airflow cluster, celery executor + flower + RabbitMQ 환경 구성하기

이번 포스트에는 AWS EC2 3대로 구성된 airflow cluster를 CeleryExecutor로 설치해보겠습니다. 서버 구성 스펙 OS - Amazon linux2 AMI (HVM) - Kernel 5.10, SSD Volume type instance type - t2.Large 2vCPUs, 8GiB Memory storage - 30GB gp2

spidyweb.tistory.com

 


1. amazon provider, boto3 다운받기

# amazon providers 다운로드
$ pip3 install apache-airflow-providers-amazon
# boto3 다운로드
$ pip3 install boto3

2. s3에 spark file 업로드하기

python 파일

s3에 있는 csv파일을 읽어서, csv파일로 쓰는 코드

from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()
        
    df = spark.read.option("header","true").csv("s3://ny-vehicle/source/")

    df.write.mode("overwrite").option("header", "true").csv("s3://ny-vehicle/target/")

    spark.stop()

3. Airflow DAG 구성하기

 

from airflow import DAG
#(deprecated) from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator

#(deprecated) from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator

#(deprecated) from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor

# from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator

from datetime import datetime, timedelta

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False
}

# EMR Create 시 지정할 세부사항
# EMR을 private subnet에 생성
JOB_FLOW_OVERRIDES = {
    "Name": "emr-madeby-airflow",
    "LogUri": "s3://emr-spark-bucket01/emr-log/",
    "ReleaseLabel": "emr-6.11.0",
    "Instances": {
        "Ec2KeyName": "EMR-server",
        "Ec2SubnetId": "subnet-022547f8aa177945a",
        "EmrManagedMasterSecurityGroup": "sg-093751a5dad11522b",
        "EmrManagedSlaveSecurityGroup": "sg-061ac372306c45cdb",
        "ServiceAccessSecurityGroup": "sg-018b4200d477c93ef",
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "r5.xlarge",
                "InstanceCount": 1,
                "Configurations": [
                    {
                        "Classification": "yarn-site",
                        "Properties": {
                            "yarn.nodemanager.vmem-check-enabled": "false",
                            "yarn.nodemanager.pmem-check-enabled": "false"
                        }
                    },
                    {
                        "Classification": "spark",
                        "Properties": {
                            "maximizeResourceAllocation": "true"
                        }
                    },
                    {
                        "Classification": "spark-defaults",
                        "Properties": {
                            "spark.network.timeout": "800s",
                            "spark.executor.heartbeatInterval": "60s",
                            "spark.dynamicAllocation.enabled": "true",
                            "spark.shuffle.service.enabled": "true",
                            "spark.driver.memory": "4G",
                            "spark.executor.memory": "4G",
                            "spark.executor.cores": "2",
                            "spark.executor.instances": "2",
                            "spark.executor.memoryOverhead": "1G",
                            "spark.driver.memoryOverhead": "1G",
                            "spark.driver.cores": "1",
                            "spark.memory.fraction": "0.80",
                            "spark.memory.storageFraction": "0.30",
                            "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:+InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=\'kill -9 %p\'",
                            "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:+InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=\'kill -9 %p\'",
                            "spark.yarn.scheduler.reporterThread.maxFailures": "5",
                            "spark.storage.level": "MEMORY_AND_DISK_SER",
                            "spark.rdd.compress": "true",
                            "spark.shuffle.compress": "true",
                            "spark.shuffle.spill.compress": "true",
                            "spark.default.parallelism": "300"
                        }
                    }
                ]
            },
            {
                "Name": "Slave nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "r5.xlarge",
                "InstanceCount": 1
            }
        ],
        "KeepJobFlowAliveWhenNoSteps": False,
        "TerminationProtected": False
    },
    "Applications": [
        {"Name": "Hadoop"},
        {"Name": "Spark"}
    ],
    "Tags": [
        {
            "Key": "Name",
            "Value": "airflow-emr-test"
        },
        {
            "Key": "sys",
            "Value": "test"
        }
    ],
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole"
}

# EMR에 제출할 Steps 세부사항
Spark_Steps = [
    {
        'Name': 'spark-on-emr-test',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                'spark-submit', '--deploy-mode', 'cluster', '--master', 'yarn', '--name', 'spark-on-emr-test',
                's3://ny-vehicle/source_code/pyspark_test.py'
            ]
        }
    }
]


# DAG 정의
with DAG(
    dag_id='airflow-emr-create',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    start_date=datetime(2023,9,10),
    schedule_interval=None
) as dag:
    # EMR 생성하기
    create_cluster = EmrCreateJobFlowOperator(
        task_id = 'create_emr_cluster',
        job_flow_overrides = JOB_FLOW_OVERRIDES,
        aws_conn_id = 'aws_default',
        emr_conn_id = None,
        region_name = 'ap-northeast-2'
    )

    # EMR에 step 제출하기
    Spark_Step = EmrAddStepsOperator(
        task_id = 'Spark_Step',
        job_flow_id = '{{ task_instance.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
        aws_conn_id = 'aws_default',
        steps = Spark_Steps
    )

    # EMR의 step이 잘 끝났는지 확인하기
    Step_Sensor = EmrStepSensor(
        task_id = 'Step_Sensor',
        job_flow_id = '{{ task_instance.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
        step_id = '{{ task_instance.xcom_pull(task_ids="Spark_Step", key="return_value")[0] }}',
        aws_conn_id = 'aws_default'
    )

    create_cluster >> Spark_Step >> Step_Sensor
  • EMR StepSensor가 필요한 이유는 Airflow에서는 EMR에 Step을 제출하기만하면 끝난것으로 나오기 때문에, 실제로 EMR에서 job이 다끝났는지를 확인 하기 위해 StepSensorOperator를 사용하여 확인을 합니다.
  • EMR을 생성할 때 정의했었던 property중에 하나인 KeepJobFlowAliveWhenNoSteps가 False로 되어있는경우, EMR에 할당된 Step이 완료됐을 시 EMR을 terminate시키게 됩니다.
  • EMR이 생성될 subnet이나, master 노드, slave노드 및 serviceaccess SG를 private SG로 지정하여 생성
  • 생성할 EMR의 노드 크기와 숫자에 따라 spark-defaults에 해당하는 memory, core를 적절하게 입력

 

4. 결과 확인하기

airflow DAG

EMR creation

EMR Step

Terminate EMR

Write CSV on S3


EMR 생성부터 Step(Spark job)제출, EMR Termination까지 잘 완료 된 것을 확인 할 수 있습니다.

EMR에 제출할 spark job이나 EMR 클러스터 configuration만 다르게 조정하여 EMR을 airflow로 제어할 수 있겠습니다.

728x90

댓글