BigData/Spark & Spark Tuning

[Spark] RDD vs DataFrame 큰 차이점, Dataframe을 사용해야 되는 이유, RDD를 사용해야 하는 경우는? RDD, Dataframe 다루기

스파이디웹 2021. 12. 16. 17:11
728x90

RDD란? 정의 및 특징, dataframe과의 차이는?

RDD(Resilient Distributed Dataset)

특징 설명
Resilient
RDD lineage 그래프를 통한 fault-tolerant가 빠졌거나,node의 실패로 인한 손상된 파티션을 다시 실행시킨다.
Distributed 클러스터의 여러 노드에 데이터가 분산되어 저장
Dataset 원천 데이터값 혹은 값의 값들로 이루어진 파티션된 collection 데이터
(튜플 혹은 다른 레코드로써 사용될 수 있는 데이터 객체들)

RDD 특징

특징 설명
In-Memory RDD속의 데이터는 가능한 많이, 오래 메모리에 저장되어 있다.
불가변성(읽기 전용) 한번 생성되고 나면 변하지 않는다. transformation 연산을 통해 새로운 RDD로써 만들 수 있다.
lazy evaluated(느린 실행) RDD속의 데이터는 action으로 분류되는 연산자가 실행되기 전까지는 실제 수행되거나 transform되지 않는다.
cacheable data를 캐싱할 수 있다.
Parallel(병행처리) 병행 처리가 가능하다.
Typed(타입존재) RDD 레코드는 타입을 가지고 있다. RDD[long],RDD[int,String]등등
Partitioned 레코드들은 논리적인 파티션으로 분할되어 있고 클러스터의 여러노드에 분산저장되어 있다.

  • RDD는 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
  • RDD는 각 record가 로우라는 개념이아닌, 객체
  • 모든값을 다루거나 값 사이의 상호작용 과정을 반드시 수동으로 정의해야 하는 RDD
  • 동일한 공간 효율성과 성능을 얻으려면 객체에 포맷 타입을 구현해 모든 RDD연산 과정에서 사용해야 함
  • 레코드의 내부 구조를 스파크에서 파악할 수 없다.
  • DataFrame
    • Dataframe의 각 레코드는 스키마를 알고 있는 필드로 구성된 구조화된 로우
    • 자동으로 데이터를 최적화하고 압축된 바이너리 포맷으로 저장
    • 필터 재정렬, 집계같은 최적화 기법도 자동으로 수행

RDD In Scala vs Python

  • RDD in Scala
    • 스칼라와 자바는 대부분 비슷한 성능이지만, 원형 객체를 다룰 때는 큰 성능 손실이 발생
  • RDD in Python
    • RDD를 파이썬으로 실행하는 것은 파이썬으로 만들어진 사용자 정의 함수를 사용해 로우마다 적용하는것과 동일
    • 직렬화과정을 거친 데이터를 파이썬 프로세스에 전달하고, 파이썬 처리가 끝나면 다시 직렬화하여 자바 가상 머신에 반환. 파이썬을 이용한 RDD는 높은 오버헤드가 발생한다.

Dataframe대신에 RDD는 언제 사용하면 좋을까

 

  • RDD는 고수준 API에서 제공하지 않는 기능이 필요한 경우(scala,java에 해당) python은 성능 손실이 크다.
    ex)
    클러스터의 물리적 데이터의 배치를 아주 세밀하게 제어해야 하는 상황
  • RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
  • 사용자 정의 공유 변수를 다뤄야 하는 경우(broadcast 변수, accumulator)
  • 정말 필요한 경우가아니면 RDD 수동 생성 x
  • RDD는 구조적 API(Dataframe)이 제공하는 여러 최적화 기법을 사용할 수 없다.(Dataframe은 효율적, 안정적, 표현력이 좋다)
  • 물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝)에 세부적인 제어가 필요할 때 RDD를 사용

RDD 생성하는 다양한 방법, RDD를 DF로 만드는 방법

1. SparkContext를 통해 생성하는 방법(Local Collection으로부터 parallelize)

1) Local Collection

  • 컬렉션(collection)은 Python, Scala 등 언어에서 제공하는 기본 자료 구조
    • Python에서는 리스트, 튜플, 딕셔너리, range 등이 대표적
    • Scala에서는 List, Seq, Array, Set 등이 이에 해당
  • Spark에서는 로컬 컬렉션을 분산 환경에서 작업할 수 있도록 RDD로 변환할 때 활용

2) parallelize()

  • Spark의 parallelize() 메서드는 로컬 컬렉션을 기반으로 RDD를 생성
  • 즉, 로컬에 있는 데이터(컬렉션)를 Spark 클러스터의 분산 환경에 분산하여 작업할 수 있는 형태(RDD)로 변환하는 역할
  • parallelize()를 사용할 때 데이터는 Spark의 워커 노드들에 균등하게 나눠 저장되고 처리(분산처리를 위한 작업)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import findspark
findspark.init()
 
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
 
conf = SparkConf()
conf.setMaster("local[*]").setAppName("PySpark")
 
sc = SparkContext(conf=conf)
 
#1 range를 통해 RDD생성
rdd1=sc.parallelize(range(500))
print(type(rdd1))
 
#2 row를 통해 RDD생성
from pyspark.sql import Row
 
data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"), 
    Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
    Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
 
rdd2=sc.parallelize(data)
 
#3 string을 split시켜 list를 만들고 RDD생성
myCollection= "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
 
#parallelize의 2번째 인자는 partition 수를 지정할 때 쓰임
words = sc.parallelize(myCollection, 5)
 
#4 직접 list를 만들어 RDD생성
list1=['a','ds','ewrew','wq','b']
listrdd=sc.parallelize(list1)
 


 

Scala 버전

import org.apache.spark.sql.Row

// Row 객체 생성
val data = Seq(Row("Alice", 25), Row("Bob", 30))

// Row 데이터를 기반으로 RDD 생성
val rdd = sc.parallelize(data)

rdd.collect().foreach(println)
// 출력: [Alice,25]
//         [Bob,30]

 


2. SparkSession을 통해 RDD만드는 방법

결국 SparkSession에서 제공하는 sparkContext를 사용하여 parallelize를 통해 RDD를 만들어야 함.

1
2
3
4
5
6
7
8
9
10
11
12
13
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
#df를 생성하여 RDD로 변환
df_sample= spark.range(100)
df_sample_toRDD=df_sample.rdd
 
#sparksession에서 sparkContext를 불러 바로 RDD생성
df_from_sparksession=spark.sparkContext.parallelize(range(100))
 

 

Scala 버전

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row

// SparkSession 생성
val spark = SparkSession.builder()
  .appName("SparkSessionExample")
  .master("local[*]")
  .getOrCreate()

// SparkContext를 통해 RDD 생성
val rdd = spark.sparkContext.parallelize(Seq(Row(1, "Alice", 25), Row(2, "Bob", 30)))

// RDD 출력
rdd.collect().foreach(println)
// 출력:
// [1, Alice, 25]
// [2, Bob, 30]

 

 


3. RDD를 DF로 바꾸는 방법

RDD + Schema = DF
→ rdd + schema를 정의해주어야 DF로 변환가능

* 단 ROW로 구성된 RDD는 스키마 매핑없이 DF로 변환 가능
(Row로 구성된 RDD는 Spark가 스키마를 자동으로 추론할 수 있기 때문에, 별도의 스키마 매핑 없이 DataFrame으로 변환이 가능)

 

1) dataframe을 통해 만들어진 rdd를 다시 DF로 변환

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
#df를 생성하여 RDD로 변환된 RDD를 다시 df로 바꿀 때(toDF())
#spark.range(100)로 만들어진 DF의 각 로우는 Row로써 표현된다.
df_sample= spark.range(100)

# Row로 구성된 RDD가 만들어 진다.
df_sample_toRDD=df_sample.rdd

# 그래서 스키마를 매핑해주지 않아도, scheama가 없는 DF로 생성 가능
df_sampe_toRDD_toDF=df_sample_toRDD.toDF()
 
cs

2) SparkSession의 createDataframe RDD 객체를 생성하고 그 객체를 DF로 변환

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
from pyspark.sql.types import Row,StructField,StructType,IntegerType
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
#공백을 기준으로 글자들을 list로 반환한다.
myCollection= "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
 
#sparksession의 sparkcontext를 이용하여 Row로 구성된 RDD를 생성한다.
rdd1 = spark.sparkContext.parallelize(Row(myCollection))
 
#만들어진 Row로 구성된 RDD를 DF로 변환한다.(schema가 없는 df가 생성됨)
df_rdd1 = spark.createDataFrame(rdd1)
 
 
#sparksession의 sparkContext를 이용하여 Row로 구성된 RDD를 생성한다.
rdd2 = spark.sparkContext.parallelize(Row((1,2,3,4,5,6)))
 
#df로 변환시 적용할 schema를 정의한다.
schema = StructType([
    StructField("col1",IntegerType(),True),
    StructField("col2",IntegerType(),True),
    StructField("col3",IntegerType(),True),
    StructField("col4",IntegerType(),True),
    StructField("col5",IntegerType(),True),
    StructField("col6",IntegerType(),True),
])
 
df_rdd2 = spark.createDataFrame(rdd2,schema)
cs

 


RDD vs PipelinedRDD의 차이

  • pyspark.rdd.RDD: 기본 RDD 타입으로 Spark에서 데이터를 분산 처리하기 위해 사용하는 가장 기본적인 데이터 구조입니다. RDD는 SparkContext에서 생성된 후 작업을 수행할 수 있음
  • pyspark.rdd.PipelinedRDD: 기본 RDD에 Transformation(예: map, filter, flatMap 등)이 적용된 결과로 생성된 RDD이며, 이 RDD는 Lazy Execution의 특성을 가지며, 실제 액션(예: collect, count, take)이 호출될 때 연산이 수행


map vs flatMap

둘 다 Spark에서 데이터를 변환할 때 사용되는 Transformation 함수

1) map

map은 주어진 함수에 의해 각 요소가 변환된 새로운 RDD를 생성

map 함수는 각각의 입력 데이터에 대해 하나의 출력 데이터를 반환하는 Transformation

  • 특징: 각 입력 요소에 대해 하나의 결과 값만 반환
  • 결과: map을 적용한 후, 결과 RDD는 원본 RDD와 같은 길이를 가지며, 각 원소가 변환
# RDD 생성
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# 각 숫자에 2를 곱하는 map 함수 적용
mapped_rdd = rdd.map(lambda x: x * 2)

# 결과 출력
mapped_rdd.collect()
# 결과: [2, 4, 6, 8, 10]

2) flatMap

flatMap은 map과 유사하지만, 중요한 차이점은 각 입력 요소에 대해 여러 개의 결과 값을 반환할 수 있음

즉, 출력 값이 하나의 데이터로 평평하게(flatten) 나열되기 때문에 입력 요소에 대해 여러 개의 값을 반환할 수 있음

  • 특징: 각 입력 요소에 대해 0개 이상의 결과 값을 반환할 수 있음
  • 결과: flatMap을 적용한 후, 결과 RDD는 길이가 달라질 수 있습니다. 예를 들어, 하나의 입력 요소에 대해 여러 개의 출력 요소가 생성될 수 있음
# RDD 생성
rdd = spark.sparkContext.parallelize([1, 2, 3])

# 각 숫자에 대해 0부터 해당 숫자까지의 숫자 리스트를 반환하는 flatMap 함수 적용
flat_mapped_rdd = rdd.flatMap(lambda x: range(x))

# 결과 출력
flat_mapped_rdd.collect()
# 결과: [0, 0, 1, 0, 1, 2]

728x90