[Spark] RDD action & transformation + Dataframe의 연산(operation) 분류
Spark는 분산 환경에서 데이터를 다루는 데 필요한 지연 처리방식(lazy evaluation)의 transformation과 즉시 실행 방식의 action을 제공
그리고 DataFrame과 Dataset의 transformation, action과 동일한 방식으로 동작한다.
1. RDD transformation
- Spark Transformation는 기존의 RDD에서 새로운 RDD를 생성하는 function
- Lazy 처리방식(lazy evaluation)이라서 action을 호출할 때 transformation이 실제로 실행 → transformation을 수행할 때 query plan만 만들고 실제로 메모리에 올리지는 않는다. action이 호출될 때 transformation을 메모리에 올려서 수행하고 action을 수행시킨다.
- transformation 종류
- Narrow transformation: 하나의 연산이 수행될 데이터가 하나의 노드에 바로 있어서 수행되는 것
- Wide transformation: 연산처리되어야 할 데이터 클러스터 노드 여기저기에 분산되어 있어서 연산처리 수행 시, 메모리 간의 전달량이 많아 지는 것, 네트워크 연산 처리량이 많아짐
- Narrow transformation: 하나의 연산이 수행될 데이터가 하나의 노드에 바로 있어서 수행되는 것
- Spark RDD Transformation 연산 종류
연산(operation) | 설명 |
distinct() | RDD의 distinct 메서드를 호출 |
filter() | SQL의 where절, RDD의 레코드를 모두 확인하고 조건 함수를 만족하는 레코드만 반환 |
map() | 주어진 입력을 원하는 값으로 반환하는 함수를 명시하고 레코드별로 적용 |
flatMap() | 단일 로우를 여러 로우로 변환해야 하는 경우에 사용하는 map의 확장버전 |
sortBy() | RDD를 정렬, 함수를 지정해 RDD의 데이터 객체에서 값을 추출한 다음 값을 기준으로 정렬 |
union() | RDD들의 elements를 합쳐서 새로운 RDD를 만들음 |
join() | 데이터베이스에서 쓰이는 조인개념, 두개의 테이블로 부터 공통된 값을 기준으로 필드를 결합 |
coalesce(), repartition() | output partition 수 조절할 때 사용 coalesce()와 repartition()의 차이는 셔플을 하냐 안하냐 차이인데, coalesce()는 full shuffling을 하지 않고, repartition()은 full shuffling을 함 |
groupByKey() | 한 쌍의 (K,V) 데이터셋이 있을 때, key K를 기준으로 데이터가 셔플되어서 새로운 RDD를 만들음 |
reduceByKey(func, [numTasks]) | 데이터가 셔플되기전에 같은 머신에 같은 key끼리 결합하여 key별로 count |
sortByKey() | 한 쌍의 (K,V) 데이터셋이 있을 때 이 데이터는 key K를 기준으로 정렬되어 새로운 RDD가 됨 |
select() | 특정 컬럼을 선택하여 DataFrame을 생성 |
selectExpr() | SQL 표현식을 사용하여 컬럼을 선택합니다. |
drop(cols) | 특정 컬럼을 제거합니다. |
filter() / where() | 조건에 따라 데이터를 필터링합니다. |
withColumn(colName, col) | 새 컬럼을 추가하거나 기존 컬럼을 수정합니다. |
withColumnRenamed(existingName, newName) | 컬럼 이름을 변경합니다. |
dropDuplicates([subset]) | 중복된 행을 제거합니다. |
limit(n) | 상위 n개의 행을 반환합니다. |
join(other, on, how) | 두 DataFrame을 조인합니다. |
groupBy(cols) | 컬럼 기준으로 그룹화합니다. |
orderBy(cols, ascending=True) | 컬럼 기준으로 정렬합니다. |
distinct() | 중복된 행을 제거합니다. |
repartition(numPartitions, cols) | 파티션을 다시 분배합니다. |
coalesce(numPartitions) | 파티션 수를 줄입니다. |
sample(withReplacement, fraction, seed) | 데이터의 샘플을 생성합니다. |
2. RDD action
- 실제의 데이터를 가지고 작업하고 싶을 때 action이 수행 됨
- action을 수행하게되면 rdd는 더이상 rdd가 아닌 non-rdd values로 바뀜
- action의 값들은 drivers에 저장되거나 외부의 storage system에 저장된다 → Laziness 한 RDD를 실제 동작으로 바꾸게 한다.
- Spark RDD action 연산 종류
연산(operation) | 설명 |
count() | RDD전체의 row수를 나타내고 RDD가아닌 int으로 바뀜 |
cellect() | driver program에 RDD전체 데이터를 리턴하고 RDD가 아닌 list로 바뀜 |
take(n) | 지정한 n만큼 RDD의 .요소를 리턴하고,RDD가 아닌 list로 바뀜 |
top(n) | 지정한 n만큼 RDD에서 큰 순서대로 리턴하고 RDD가아닌 list로 바뀜 |
countByValue() | RDD의 value별로 수를 count해서 return해주고 dict로 바뀐다. |
reduce() | RDD의 모든 값을 하나의 값으로 만들 때 사용, RDD가 아닌 다른 타입으로 바뀜 |
fold() | reduce()와 비슷, zero value(0)을 input으로서 사용한다. reduce()와의 가장 큰 차이는 reduce()는 빈 collection에 exception을 던지고 fold()는 빈 collection을 위해 정의되었다. |
aggregate() | input type으로부터 다른 데이터 타입을 얻을 수 있도록 유연성을 제공 |
first() | RDD의 요소중 가장 첫 번째 값을 리턴하고 RDD가 아닌 다른 타입으로 바뀜 |
max(), min() | RDD의 요소중 가장 큰 값, 가장 작은 값을 리턴하고 RDD가 아닌 다른 타입으로 바뀜 |
show([n, truncate]) | 데이터를 표 형태로 출력합니다. |
head([n]) | 상위 n개의 행을 반환합니다. |
toLocalIterator() | 모든 데이터를 로컬에서 순회 가능한 이터레이터로 반환합니다. |
collectAsList() | 모든 데이터를 리스트 형태로 반환합니다. |
write / writeStream | DataFrame을 외부 저장소에 저장합니다. |
3.Dataframe의 transformation, action 연산 분류
transformation | action |
distinct() | show() |
withColumn() | collect() |
withColumnRenamed() | count() |
filter(), where() | take() |
groupBy() | reduce() |
agg(sum,min,max,count...) | first() |
select() | describe() |
selectExpr() | explain() |
union(),unionAll() | |
sort(), orderBy() | |
drop() |
*spark.read나 spark.sql은 lazy transformation?
어떻게 보면 맞긴하다.
csv파일 데이터를 읽어오는데 스키마를 알기위해서 inferSchema옵션을 true로 주면, eager operation으로 메모리에 데이터를 다 올리지만,
parquet같이 메타데이터가 포함되어 있으면 file을 읽어올 때에는 lazy 하게 접근한다고 봐도 된다.
같은 맥락으로 csv파일도 schema를 직접 만들어서 read시에 .schema(정의한 스키마)처럼 주게되면 메모리를 사용하지 않고, lazy하게 접근한다.
spark.sql 또한 해당 sql을 논리적인 실행계획만 만들어두고, action이 있기 전까지는 메모리를 사용하지 않는다.
*df.write은 action?
action이다. 하지만 정확히는 basic action으로 분류된다.
*basic action이란?
연산자(메소드)들의 그룹이다
ex)
write, toDF, schema, printSchema(), createTempView()...
결론:
- transformation: RDD에서 봤듯이 Dataframe에서도 실제 데이터를 가지고 작업하지 않고 catalyst optimizer가 논리적 실행계획만 만드는 연산들이다.
- Action: RDD와 마찬가지로 Dataframe에서도 실제 데이터를 가지고 작업해야 하는 연산들, 즉 메모리에 올려야되는 연산들이다.
참조:
https://sparkbyexamples.com/spark/different-ways-to-create-a-spark-dataframe/
https://www.youtube.com/watch?v=ROktlHkcVBY
https://data-flair.training/blogs/spark-rdd-operations-transformations-actions/
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Dataset-basic-actions.html
https://towardsdatascience.com/a-brief-introduction-to-pyspark-ff4284701873