BigData/Spark & Spark Tuning

[Spark] RDD vs DataFrame 큰 차이점, Dataframe을 사용해야 되는 이유, RDD를 사용해야 하는 경우는? RDD, Dataframe 다루기

스파이디웹 2021. 12. 16. 17:11
728x90

RDD란? 정의 및 특징, dataframe과의 차이는?

RDD(Resilient Distributed Dataset)

특징 설명
Resilient
RDD lineage 그래프를 통한 fault-tolerant가 빠졌거나,node의 실패로 인한 손상된 파티션을 다시 실행시킨다.
Distributed
클러스터의 여러 노드에 데이터가 분산되어 저장
Dataset 원천 데이터값 혹은 값의 값들로 이루어진 파티션된 collection 데이터
(튜플 혹은 다른 레코드로써 사용될 수 있는 데이터 객체들)

RDD 특징

특징 설명
In-Memory RDD속의 데이터는 가능한 많이, 오래 메모리에 저장되어 있다.
불가변성(읽기 전용) 한번 생성되고 나면 변하지 않는다. transformation 연산을 통해 새로운 RDD로써 만들 수 있다.
lazy evaluated(느린 실행) RDD속의 데이터는 action으로 분류되는 연산자가 실행되기 전까지는 실제 수행되거나 transform되지 않는다.
cacheable data를 캐싱할 수 있다.
Parallel(병행처리) 병행 처리가 가능하다.
Typed(타입존재) RDD 레코드는 타입을 가지고 있다. RDD[long],RDD[int,String]등등
Partitioned 레코드들은 논리적인 파티션으로 분할되어 있고 클러스터의 여러노드에 분산저장되어 있다.

  • RDD는 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
  • RDD는 각 record가 로우라는 개념이아닌, 객체
  • 모든값을 다루거나 값 사이의 상호작용 과정을 반드시 수동으로 정의해야 하는 RDD
  • 동일한 공간 효율성과 성능을 얻으려면 객체에 포맷 타입을 구현해 모든 RDD연산 과정에서 사용해야 함
  • 레코드의 내부 구조를 스파크에서 파악할 수 없다.
  • DataFrame
    • Dataframe의 각 레코드는 스키마를 알고 있는 필드로 구성된 구조화된 로우
    • 자동으로 데이터를 최적화하고 압축된 바이너리 포맷으로 저장
    • 필터 재정렬, 집계같은 최적화 기법도 자동으로 수행

RDD In Scala vs Python

  • RDD in Scala
    • 스칼라와 자바는 대부분 비슷한 성능이지만, 원형 객체를 다룰 때는 큰 성능 손실이 발생
  • RDD in Python
    • RDD를 파이썬으로 실행하는 것은 파이썬으로 만들어진 사용자 정의 함수를 사용해 로우마다 적용하는것과 동일
    • 직렬화과정을 거친 데이터를 파이썬 프로세스에 전달하고, 파이썬 처리가 끝나면 다시 직렬화하여 자바 가상 머신에 반환. 파이썬을 이용한 RDD는 높은 오버헤드가 발생한다.

Dataframe대신에 RDD는 언제 사용하면 좋을까

 

  • RDD는 고수준 API에서 제공하지 않는 기능이 필요한 경우(scala,java에 해당) python은 성능 손실이 크다.
    ex)
    클러스터의 물리적 데이터의 배치를 아주 세밀하게 제어해야 하는 상황
  • RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
  • 사용자 정의 공유 변수를 다뤄야 하는 경우(broadcast 변수, accumulator)
  • 정말 필요한 경우가아니면 RDD 수동 생성 x
  • RDD는 구조적 API(Dataframe)이 제공하는 여러 최적화 기법을 사용할 수 없다.(Dataframe은 효율적, 안정적, 표현력이 좋다)
  • 물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝)에 세부적인 제어가 필요할 때 RDD를 사용

RDD 생성하는 다양한 방법, RDD를 DF로 만드는 방법

1. SparkContext를 통해 생성하는방법

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import findspark
findspark.init()
 
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
 
conf = SparkConf()
conf.setMaster("local[*]").setAppName("PySpark")
 
sc = SparkContext(conf=conf)
 
#1 range를 통해 RDD생성
rdd1=sc.parallelize(range(500))
print(type(rdd1))
 
#2 row를 통해 RDD생성
from pyspark.sql import Row
 
data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"), 
    Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
    Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
 
rdd2=sc.parallelize(data)
 
#3 string을 split시켜 list를 만들고 RDD생성
myCollection= "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
 
#parallelize의 2번째 인자는 partition 수를 지정할 때 쓰임
words = sc.parallelize(myCollection, 5)
 
#4 직접 list를 만들어 RDD생성
list1=['a','ds','ewrew','wq','b']
listrdd=sc.parallelize(list1)
 



2. SparkSession을 통해 RDD만드는 방법

1
2
3
4
5
6
7
8
9
10
11
12
13
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
#df를 생성하여 RDD로 변환
df_sample= spark.range(100)
df_sample_toRDD=df_sample.rdd
 
#sparksession에서 sparkContext를 불러 바로 RDD생성
df_from_sparksession=spark.sparkContext.parallelize(range(100))
 

3. RDD를 DF로 바꾸는 방법

RDD를 DF로 바꾸기 위해선 가장 중요한 것이 RDD와 DF의 차이인 Row로 만들어진 RDD여야 한다는 것


RDD는 record가 Row가 아닌 그냥 객체의 개념이므로 튜플로만들어진 RDD,list로 만들어진 RDD, Row로 만들어진 RDD 모두 가능하지만, DF가 되기위해선 Row로 만들어진 RDD여야 한다.

 

1) dataframe을 통해 만들어진 rdd를 다시 DF로 변환

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
#df를 생성하여 RDD로 변환된 RDD를 다시 df로 바꿀 때(toDF())
#spark.range(100)로 만들어진 DF의 각 로우는 Row로써 표현된다.
df_sample= spark.range(100)

#df를 rdd로 변환시키더라도 각 로우는 일반 객체가아닌 Row로써 구성되어 있다.
df_sample_toRDD=df_sample.rdd

#df로부터 만들어진 rdd는 Row로써 구성되어 있어 toDF()를 통하여 df로 만들 수 있다.
df_sampe_toRDD_toDF=df_sample_toRDD.toDF()
 
cs

2) SparkSession의 createDataframe RDD 객체를 생성하고 그 객체를 DF로 변환

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
from pyspark.sql.types import Row,StructField,StructType,IntegerType
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
 
#공백을 기준으로 글자들을 list로 반환한다.
myCollection= "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
 
#sparksession의 sparkcontext를 이용하여 Row로 구성된 RDD를 생성한다.
rdd1 = spark.sparkContext.parallelize(Row(myCollection))
 
#만들어진 Row로 구성된 RDD를 DF로 변환한다.(schema가 없는 df가 생성됨)
df_rdd1 = spark.createDataFrame(rdd1)
 
 
#sparksession의 sparkContext를 이용하여 Row로 구성된 RDD를 생성한다.
rdd2 = spark.sparkContext.parallelize(Row((1,2,3,4,5,6)))
 
#df로 변환시 적용할 schema를 정의한다.
schema = StructType([
    StructField("col1",IntegerType(),True),
    StructField("col2",IntegerType(),True),
    StructField("col3",IntegerType(),True),
    StructField("col4",IntegerType(),True),
    StructField("col5",IntegerType(),True),
    StructField("col6",IntegerType(),True),
])
 
df_rdd2 = spark.createDataFrame(rdd2,schema)
cs

 

 

728x90