본문 바로가기
BigData/Spark & Spark Tuning

[Spark] EMR Serverless + Airflow로 spark job 제출해보기 (EmrServerlessStartJobOperator, boto3 + PythonOperator)

by 스파이디웹 2023. 8. 27.
728x90

이번 포스트는 EMR Serverless로 전환하면서 생긴 꿀팁들과 Airflow로 EMR Serverless에 Spark job을 제출하는 것을 포스팅하려고 합니다.

 

이번 포스트의 목차

  • EMR Serverless란?
  • EMR Serverless로 전환 이유
  • EMR Serverless로 전환 대상
  • EMR Serverless 생성 방법
  • EMR에 Airflow로 Spark job 제출하는 방법(EmrServerlessStartJobOperator)
  • EMR에 Airflow로 Spark job 제출하는 방법(boto3 + PythonOperator)
  • 전환 시 얻은 효과(성능, 비용)

EMR Serverless란?

EMR(Elastic Map Reduce) 서비스를 인프라 관리할 필요 없이 Serverless로서 제공하는 AWS의 제품

(Spark과 Hive 작업을 제출 할 수 있음)

 

1. 요금

요금은 Job을 실행시키면서 사용한 vCPU와 Memory당 비용으로 청구 됩니다.

시간당 vCPU당 0.060528 USD
시간당 GB당 0.006643 USD

 


EMR Serverless로 전환 이유

현재 내가 다니는 회사는 EMR을 매 배치 작업마다 create하고 Spark job을 step으로 제출하고 termination 까지 이뤄지는 일련의 절차를 따라가고 있었습니다.

요즘 대세는 인프라를 관리할 필요없는 Serverless이기도 하고, 비용적으로도 절감이 된다는 많은 use case들이 있었기 때문에 EMR Serverless 전환 POC와 더불어 전환 가능한 몇가지 batch job들을 전환하기로 하엿습니다.

 


EMR Serverless로 전환 대상

Serverless가 마냥 좋아서 무조건 전환해야된다? → X

Serverless가 항상 좋은 건 아닙니다. 처음 시도하는 환경이라 dependency(5.28.0 -> 6.11.0) 및 여러가지 새로운 문제도 분명 생길 것이고 여러 자료에서는 큰 작업을 돌릴 경우, 기존의 EMR보다 더 비용이 많이 나올 수 있다고 합니다.

 

실제로도 athena같은 serverless 기반의 쿼리당 비용을 받는 제품들을 잘못사용하는 경우, 기업들은 엄청난 금액을 내게될 수 있습니다.

그런경우 EC2로 분석용 쿼리를 날릴 수 있게 구축하거나, 상시 EMR을 띄우기도 합니다.

 

그래서 저의 경우, 너무 heavy하거나, job 실패 시 서비스에 치명적인 job들을 제외한 나머지를 EMR Serverless로 전환 하였습니다.

 


EMR Serverless 생성 방법

Airflow의 EmrServerlessCreateApplicationOperator 를 사용할 수도 있지만,

굳이 Airflow로 생성하지 않더라도 콘솔에서 직접 생성 할 수 있습니다.

EMR Serverless는 Application을 만든다고 하는데, EMR과 마찬가지로 여러 APP을 만들 수 있습니다.

Batch단위 혹은 JAR단위로 만드는 것을 고려하여 생성하면 됩니다.

 

APP 생성 UI를 보면 이름 넣는 란, Job 종류(Spark, Hive), EMR version을 선택할 수 있고 Architecture를 선택할 수 있습니다.

 

1. Release Version

EMR-6.11.0을 선택한 이유는 Spark 3.x를 사용하면서, Java 17을 사용 할 수 있기 때문입니다.

이후에 정리하겠지만, spark3.x버전을 사용했을 때 얻을 수 있는 성능적인 이점은 정말 많습니다.

그리고 JAVA 17은 new garbage collector인 ZGC가 도입됐기 때문에, 해당 기능을 이용하기위해 EMR-6.11.0을 선택했습니다.

 

2. Archtecture

arm64를 선택했는데, 그 이유는 Graviton2를 사용해서 비용적으로 좀더 싸기 때문에 arm64를 선택했습니다.

 


3. Pre-initialized capaticy

EMR Serverless가 시작될 때의 spark properties라고 보면 되는데, 위가 기본값인데, EMR Serverless는 dynamic Allocation이 true로 설정되어 있다. 그리고 시작 executor값이 3이기 때문에 위와 같이 설정되어 있습니다.

 

우리 같은경우는 기본값으로도 두고, 조금 heavy한 job같은 경우는 시작값을 크게 주고 job을 돌려도 봤습니다.

 

시작값을 조금 크게 주고 시작하는 경우, 확실히 job이 빨리 끝나긴 하는 것 같지만 쓴만큼 비용이 나오는 serverless의 경우 비용도 그만큼 늘어난 것을 확인 할 수 있었습니다.

 

4. Application limits

위의 값은 기본값 그대로 뒀었는데, 무거운 job을 전환하면서 해당 값보다 더 많은 자원이 필요한 경우 job이 계속 pending되고 비용이 어마무시하게 청구될 수도 있습니다.


이외에 network connection만 EMR관련 VPC로 설정하고 나머진 기본값을 사용했습니다.

 


EMR에 Airflow로 Spark job 제출하는 방법(EmrServerlessStartJobOperator)

Airflow dag에서 spark job을 제출해보겠습니다. 해당하는 Operator는 apache-airflow-providers-amazon==6.0.0 에서 지원합니다.

 

1. Spark Code

테스트를 위해 spark code는 csv를 parquet로 변환하는 코드를 사용하겠습니다.

from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()
        
    df = spark.read.option("header","true").csv("s3://ny-vehicle/source/")

    df.write.mode("overwrite").option("header", "true").csv("s3://ny-vehicle/target/")

    spark.stop()

2. IAM role

role 생성 - custom role

{
     "Effect": "Allow",
     "Action": "iam:PassRole",
     "Resource": "arn:aws:iam::1234567890:role/JobRuntimeRoleForEMRServerless",
        "Condition": {
                "StringLike": {
                    "iam:PassedToService": "emr-serverless.amazonaws.com"
                }
            }
}

 

policy 추가

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadAccessForEMRSamples",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*.elasticmapreduce",
                "arn:aws:s3:::*.elasticmapreduce/*"
            ]
        },
        {
            "Sid": "FullAccessToS3Bucket",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET",
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET/*"
            ]
        },
        {
            "Sid": "GlueCreateAndReadDataCatalog",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:CreateDatabase",
                "glue:GetDataBases",
                "glue:CreateTable",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:CreatePartition",
                "glue:BatchCreatePartition",
                "glue:GetUserDefinedFunctions"
            ],
            "Resource": ["*"]
        }
    ]
}

3. Airflow DAGs

* Airflow server 내의 Access Key, Secret Key등의 credentials 정보와, region이 명시되어 있어야 합니다.

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import EmrServerlessStartJobOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

# Replace these with your correct values
JOB_ROLE_ARN = "arn:aws:iam::533364636158:role/EMR-Serverless-Runtime-Role"

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://emr-spark-bucket01/emr-log/"}
    },
}

default_args = {
    'owner': 'spidyweb',
    'retries': 0,
    'retry_delay': timedelta(seconds=20),
    'depends_on_past': False
}

with DAG(
    'emr-serverless-spark-job-test',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='@once',
    catchup=False,
    tags=['emr-serverless-spark-job-test'],
    is_paused_upon_creation=False,
) as dag:
    spark_test = EmrServerlessStartJobOperator(
        task_id="spark_test",
        application_id="00fcohjd0tp9mf2p",
        execution_role_arn=JOB_ROLE_ARN,
		name="emr_serverless_job",
        job_driver={
            "sparkSubmit": {
                "entryPoint": "s3://ny-vehicle/source_code/pyspark_test.py",
				"sparkSubmitParameters": "--conf spark.executor.extraJavaOptions=-Duser.timezone=Asia/Seoul --conf spark.driver.extraJavaOptions=-Duser.timezone=Asia/Seoul \
				--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.aarch64 \
				--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.aarch64"
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )
	
	spark_test
"--conf spark.executor.extraJavaOptions=-Duser.timezone=Asia/Seoul \# UTC가 아닌 KST로 맞추기 위한 파라미터
--conf spark.driver.extraJavaOptions=-Duser.timezone=Asia/Seoul \# UTC가 아닌 KST로 맞추기 위한 파라미터
--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.aarch64 \# JAVA 17사용하기위한 파라미터
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.aarch64"# JAVA 17사용하기위한 파라미터

Airflow에 제출되고 Success된 모습

EMR Serverless에 지정한 job 이름으로 제출되고, 성공하는 모습

결과로 s3에 csv파일이 써진 모습


EMR에 Airflow로 Spark job 제출하는 방법(boto3 + PythonOperator)

airflow dag에서 Python Operator + boto3로 spark job을 제출해보겠습니다. 해당 작업을 수행하려면 

boto3>=1.23.9 여야 합니다.

pip3 install boto3==1.23.9

스파크 코드 및 IAM Role은 동일하며, airflow dag만 다릅니다.

1. Airflow DAGs

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import boto3

# Replace these with your correct values
JOB_ROLE_ARN = "arn:aws:iam::533364636158:role/EMR-Serverless-Runtime-Role"

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://emr-spark-bucket01/emr-log/"}
    },
}

default_args = {
    'owner': 'spidyweb',
    'retries': 0,
    'retry_delay': timedelta(seconds=20),
    'depends_on_past': False
}

def emr_serverless_job_python_operator() -> bool:
    client = boto3.client("emr-serverless", region_name="ap-northeast-2")
	appliation_id = "00fcohjd0tp9mf2p"
	response = client.start_job_run(
	    applicationId=appliation_id,
		executionRoleArn=JOB_ROLE_ARN,
		job_driver={
            "sparkSubmit": {
                "entryPoint": "s3://ny-vehicle/source_code/pyspark_test.py",
				"sparkSubmitParameters": "--conf spark.executor.extraJavaOptions=-Duser.timezone=Asia/Seoul --conf spark.driver.extraJavaOptions=-Duser.timezone=Asia/Seoul \
				--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.aarch64 \
				--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.aarch64"
            }
        },
		configurationOverrides=DEFAULT_MONITORING_CONFIG,
		name="emr_serverless_job_python_operator"	
		
		
with DAG(
    'emr-serverless-spark-job-test2',
    start_date=days_ago(2),
    default_args=default_args,
    schedule_interval='@once',
    catchup=False,
    tags=['emr-serverless-spark-job-test2'],
    is_paused_upon_creation=False,
) as dag:
    spark_test2 = PtyhonOperator(
        task_id='spark_test2',
		python_callable='emr_serverless_job_python_operator',
		dag=dag
	)
	
	spark_test2

Airflow dag 성공한 모습

*다만 Airflow dag는 제출만되면 success로 표현되기 때문에, EMR Stepsensor와 같은 로직을 파이썬 코드에 추가해줘야 합니다.

 

EMR Serverless에 제출된 job

결과로 s3버킷 내 생성된 csv파일


전환 시 얻은 효과(성능, 비용)

1. 성능

속도적인 측면에서는 기존에는 5.x version의 EMR을 사용했기 때문에 spark 3.x의 feature인 AQE를 사용할 수 없었는데,

emr 6.x 버전으로 전환되면서 복잡한 쿼리에서의 속도가 많이 향상됐으며, 단순 파케이 변환같은 작업은 오히려 시간이 늘어나는 경우도 있었습니다.(자원 튜닝을 하면 빨라지겠지만, 기본값으로 두고 dynamic allocation에 맡긴 결과)

 

결론적으로, 시간적인 측면에서 평균 15%~20%정도 감소했고, 오히려 늘어난 job들도 있다고 생각하시면 됩니다.

 

2. 비용

비용적으로는 확실히 절감이 됐습니다.

엄청나게 무거운 job을 serverless에 제출하지 않아서 비교는 못하지만, running time이 전체적으로 감소한 점(특히나 EMR on EC2의 provisioning되는 시간이 없기 때문에)퍼센테이지로 치면, job마다 다르지만 평균 10프로 내외였던 것 같습니다.

 

 

참조:

https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-airflow.html

https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/security-iam-runtime-role.html

728x90

댓글