728x90
아직까지도 많은 기업에서는 EMR을 원하는 시간대에 띄워서 batch job을 airflow schedule에 맞게 실행시키고 종료시키는 ETL 형태를 많이 사용하고 있습니다.
그래서 이번 포스트에는 Airflow로 EMR을 띄우고, spark job을 제출하고 job이 끝나는 대로 EMR을 종료시키는 DAG를 구성해보겠습니다.
Airflow는 미리 구성되어 있다고 가정하고 시작하겠습니다.
Airflow 구성부터 해보고 싶으시면 아래의 링크를 참조해주세요.
https://spidyweb.tistory.com/449
1. amazon provider, boto3 다운받기
# amazon providers 다운로드
$ pip3 install apache-airflow-providers-amazon
# boto3 다운로드
$ pip3 install boto3
2. s3에 spark file 업로드하기
python 파일
s3에 있는 csv파일을 읽어서, csv파일로 쓰는 코드
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
df = spark.read.option("header","true").csv("s3://ny-vehicle/source/")
df.write.mode("overwrite").option("header", "true").csv("s3://ny-vehicle/target/")
spark.stop()
3. Airflow DAG 구성하기
from airflow import DAG
#(deprecated) from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
#(deprecated) from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
#(deprecated) from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
# from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from datetime import datetime, timedelta
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False
}
# EMR Create 시 지정할 세부사항
# EMR을 private subnet에 생성
JOB_FLOW_OVERRIDES = {
"Name": "emr-madeby-airflow",
"LogUri": "s3://emr-spark-bucket01/emr-log/",
"ReleaseLabel": "emr-6.11.0",
"Instances": {
"Ec2KeyName": "EMR-server",
"Ec2SubnetId": "subnet-022547f8aa177945a",
"EmrManagedMasterSecurityGroup": "sg-093751a5dad11522b",
"EmrManagedSlaveSecurityGroup": "sg-061ac372306c45cdb",
"ServiceAccessSecurityGroup": "sg-018b4200d477c93ef",
"InstanceGroups": [
{
"Name": "Master node",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "r5.xlarge",
"InstanceCount": 1,
"Configurations": [
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.network.timeout": "800s",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "true",
"spark.shuffle.service.enabled": "true",
"spark.driver.memory": "4G",
"spark.executor.memory": "4G",
"spark.executor.cores": "2",
"spark.executor.instances": "2",
"spark.executor.memoryOverhead": "1G",
"spark.driver.memoryOverhead": "1G",
"spark.driver.cores": "1",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:+InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=\'kill -9 %p\'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:+InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=\'kill -9 %p\'",
"spark.yarn.scheduler.reporterThread.maxFailures": "5",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.default.parallelism": "300"
}
}
]
},
{
"Name": "Slave nodes",
"Market": "ON_DEMAND",
"InstanceRole": "CORE",
"InstanceType": "r5.xlarge",
"InstanceCount": 1
}
],
"KeepJobFlowAliveWhenNoSteps": False,
"TerminationProtected": False
},
"Applications": [
{"Name": "Hadoop"},
{"Name": "Spark"}
],
"Tags": [
{
"Key": "Name",
"Value": "airflow-emr-test"
},
{
"Key": "sys",
"Value": "test"
}
],
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole"
}
# EMR에 제출할 Steps 세부사항
Spark_Steps = [
{
'Name': 'spark-on-emr-test',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit', '--deploy-mode', 'cluster', '--master', 'yarn', '--name', 'spark-on-emr-test',
's3://ny-vehicle/source_code/pyspark_test.py'
]
}
}
]
# DAG 정의
with DAG(
dag_id='airflow-emr-create',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
start_date=datetime(2023,9,10),
schedule_interval=None
) as dag:
# EMR 생성하기
create_cluster = EmrCreateJobFlowOperator(
task_id = 'create_emr_cluster',
job_flow_overrides = JOB_FLOW_OVERRIDES,
aws_conn_id = 'aws_default',
emr_conn_id = None,
region_name = 'ap-northeast-2'
)
# EMR에 step 제출하기
Spark_Step = EmrAddStepsOperator(
task_id = 'Spark_Step',
job_flow_id = '{{ task_instance.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
aws_conn_id = 'aws_default',
steps = Spark_Steps
)
# EMR의 step이 잘 끝났는지 확인하기
Step_Sensor = EmrStepSensor(
task_id = 'Step_Sensor',
job_flow_id = '{{ task_instance.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
step_id = '{{ task_instance.xcom_pull(task_ids="Spark_Step", key="return_value")[0] }}',
aws_conn_id = 'aws_default'
)
create_cluster >> Spark_Step >> Step_Sensor
- EMR StepSensor가 필요한 이유는 Airflow에서는 EMR에 Step을 제출하기만하면 끝난것으로 나오기 때문에, 실제로 EMR에서 job이 다끝났는지를 확인 하기 위해 StepSensorOperator를 사용하여 확인을 합니다.
- EMR을 생성할 때 정의했었던 property중에 하나인 KeepJobFlowAliveWhenNoSteps가 False로 되어있는경우, EMR에 할당된 Step이 완료됐을 시 EMR을 terminate시키게 됩니다.
- EMR이 생성될 subnet이나, master 노드, slave노드 및 serviceaccess SG를 private SG로 지정하여 생성
- 생성할 EMR의 노드 크기와 숫자에 따라 spark-defaults에 해당하는 memory, core를 적절하게 입력
4. 결과 확인하기
airflow DAG
EMR creation
EMR Step
Terminate EMR
Write CSV on S3
EMR 생성부터 Step(Spark job)제출, EMR Termination까지 잘 완료 된 것을 확인 할 수 있습니다.
EMR에 제출할 spark job이나 EMR 클러스터 configuration만 다르게 조정하여 EMR을 airflow로 제어할 수 있겠습니다.
728x90
'BigData > Apache Airflow' 카테고리의 다른 글
[Airflow] Airflow DAGs 이상감지, 알림받기, 결과전송 (EmailOperator, Slack) (0) | 2023.08.30 |
---|---|
[Airflow] Airflow로 ETL 파이프라인 만들기(python, EMR, glue crawler, Email, Slack, DB반영) (0) | 2023.08.07 |
[Airflow] Amazon linux2에 docker-compose로 airflow 설치하기 (0) | 2023.03.22 |
[Airflow] Airflow cluster, celery executor + flower + RabbitMQ 환경 구성하기 (0) | 2023.01.09 |
[Airflow] docker-compose.yml로 airflow 설치하기 (0) | 2023.01.07 |
댓글