본문 바로가기
BigData/Spark & Spark Tuning

[Spark] Spark 실행 과정 by Catalyst Optimizer, Query plan 보는 법, Spark UI 보는 법

by 스파이디웹 2022. 1. 12.
728x90

Query Plan을 보기에 앞서 Spark 코드의 실행 과정을 알아 보겠습니다.

 

1. Spark 실행 계획

논리적 실행 단계(logical plan)

사용자의 코드를 논리적 실행 계획으로 변환

논리적 실행 계획 단계에서는 추상적 transformation만 표현하고, driver나 executor의 정보를 고려하지 않음

이 논리적 실행 계획으로 변환 시키는 데에는 여러 단계가 있는데,

 

1) unresolved logical plan(검증 전 논리적 실행 계획)

  • 코드의 유효성과 테이블이나 컬럼의 존재 여부만을 판단하는 과정, 실행 계획은 검증되지 않은 상태
  • spark analyzer는 컬럼과 테이블을 검증하기 위해 Catalog, 모든 테이블의 저장소 그리고 Dataframe 정보를 활용
  • 필요한 테이블이나 컬럼이 Catalog에 없다면 검증 전 논리적 실행 계획이 만들어지지 않음

2) logical plan(검증된 논리적 실행 계획)

  • 코드의 유효성과 구문상의 검증이 완료되어 논리적 실행 계획 생성
  • 테이블과 컬럼에 대한 결과는 Catalyst Optimizer로 전달

3) optimized logical plan(최적화된 논리적 실행 계획)

  • 검증된 논리적 실행계획을 Catalyst Optimizer로 전달하여 조건절 푸쉬 다운이나 선택절 구문을 이용해 논리적 실행 계획을 최적화함

물리적 실행 단계(physical plan)

Spark 실행 계획이라고도 불리는 물리적 실행 단계는 앞으로 알아 볼 Query plan보기, Spark UI보기에 표시 될 실행 계획

논리적 실행 계획을 클러스터 환경에서 실행하는 방법을 정의

 

1) physical plans(물리적 실행계획들)

  • 최적화된 논리적 실행 계획을 가지고 하나 이상의 물리적 실행 계획을 생성
  • 다양한 물리적 실행 전략을 생성(각 operator별 어떤 알고리즘을 적용할 지)하고 비용 모델을 이용해서 비교한 후 최적의 전략을 선택

2) Cost Model(비용 모델)

  • 가장 좋은 plan은 비용기반 모델(모델의 비용을 비교)
  • 테이블의 크기나 파티션 수 등 물리적 속성을 고려해 지정된 조인 연산 수행에 필요한 비용을 계산하고 비교함

3) Selected Physical Plan(최적의 물리적 실행 계획)

  • 위의 비용 모델에 따라 최적의 물리적 실행 계획을 도출

4) 물리적 실행 계획은 일련의 RDD와 transformation으로 변환(컴파일)

  • 각 머신에서 실행 될 Java Bytecode를 생성함(compile)

2. Query Plan + Spark UI

Query Plan은 위에서 언급 한 것과 같이 Physical plan을 의미하고, dataframe에 explain()을 함으로써 확인 할 수 있다.

Spark UI는 SparkSession이 생성되면 IP주소:port를 통해 접속할 수 있는데, 보통은 4040포트를 사용한다.

Spark UI의 SQL를 눌러보면 위의 explain()으로 확인한 Physical Plan을 UI로 확인할 수 있다.

단, Dataframe에 action이 실행되어야 물리적인 실행계획이 생성되기 때문에, UI의 SQL에서도 action이 실행되어야 확인할 수 있다.

Query Plan과 SparkUI에서의 각 실행 단계들은 대부분 아래의 단계들로 추려질 수 있습니다.

 

CollapseCodegenStages

UI에서 여러 연산자들이 큰 파란색 직사각형안에 그룹핑되어 있는데, codegen stages로 구별된다.

  • CollapseCodegenStages라는 규칙이 Catalyst Optimizer에 있는데, 이것은 코드 생성을 지원하는 연산자를 가져와 함께 축소하여 가상 함수 호출을 제거하여 실행 속도를 높이는 것

Scan [file format]

  • source 파일 포멧에 따라서 scan csv, scan json, scan parquet등 이름이 정해짐

Project

  • (컬럼/추가/필터링)
  • select
  • withColumn
  • when().otherwise()와 같은 select내에서의 필터링 및 조작

Exchange

  • 쉽게 말해서 Exchange는 shuffle, cluster내에서의 물리적인 데이터의 이동을 뜻함
  • 물리적인 데이터의 이동은 비용이 많이 들음
  • joins: Hash join이 일어날  때 Hash Partitiong Exchange가 일어남
  • Repartitions: n개의 파티션에 데이터를 랜덤으로 재분배할 경우, RoundRobinPartitioning Exchange가 일어남
  • Coalesce: 단일 executor로 데이터를 전부 이전시켜야 할 때 SinglePartition Exchange가 일어남
  • Sort: 결과 데이터를 정렬시킬 때(order by) RangePartitioning Exchange가 일어남
  • groupBy(), union()과 같은 연산자도 data shuffling(Exchange)을 함

Filter

  • 필터 조건이 쿼리에서 작성된 것대로 전부 계획에 반영되지 않고, Catalyst Optimizer가 중복된 필터는 제외시키고, 직관적인 새로운 필터를 적용시켜줌
  • where(), filter()에 해당

HashAggregate

  • count,sum,min,max등 data aggregation에 쓰임

join: BroadcastHashJoin & BroadcastExchange

  • 조인 단계에서 3가지 조인 알고리즘으로 분류되어 나타남
  • 이 알고리즘들은 equijoin(동등조인)에서만 사용되어야 하고 그렇지 않을 때는 성능이 현저히 떨어짐
  • BroadcastHashJoin:
    조인하는 dataframe중 한 쪽이 작은 크기일 때(몇 MB정도) 작은 DF는 모든 executor에 broadcast됨 (Exchange)
    그리고 큰 테이블과 Hash join을 함
  • ShuffledHashJoin:
    다른 한쪽의 Dataframe이 나머지 한 쪽 보다 최소 3배 이상 작고, 평균 파티션 크기가 broadcast하기에 충분히 작을 때 사용. partition들이 broadcast되고(exchange) Hash Join을 함
  • SortMergeJoin:
    가장 일반적인 조인. 위의 두가지 조인기법을 사용할 수 없을 때 사용. 양쪽에서 정렬되고, MergeSort join을 함
  • 이외에도 Cartesian Product Join이나 BroadcastNestedLoopJoin이 있지만 이 들은 좋은 성능을 내지 못함

*더 알게 되는 내용이 있거나, 수정할 내용이 있을 때 수정하겠습니다.

728x90

댓글