728x90
RDD란? 정의 및 특징, dataframe과의 차이는?
RDD(Resilient Distributed Dataset)
특징 | 설명 |
Resilient | RDD lineage 그래프를 통한 fault-tolerant가 빠졌거나,node의 실패로 인한 손상된 파티션을 다시 실행시킨다. |
Distributed | 클러스터의 여러 노드에 데이터가 분산되어 저장 |
Dataset | 원천 데이터값 혹은 값의 값들로 이루어진 파티션된 collection 데이터 (튜플 혹은 다른 레코드로써 사용될 수 있는 데이터 객체들) |
RDD 특징
특징 | 설명 |
In-Memory | RDD속의 데이터는 가능한 많이, 오래 메모리에 저장되어 있다. |
불가변성(읽기 전용) | 한번 생성되고 나면 변하지 않는다. transformation 연산을 통해 새로운 RDD로써 만들 수 있다. |
lazy evaluated(느린 실행) | RDD속의 데이터는 action으로 분류되는 연산자가 실행되기 전까지는 실제 수행되거나 transform되지 않는다. |
cacheable | data를 캐싱할 수 있다. |
Parallel(병행처리) | 병행 처리가 가능하다. |
Typed(타입존재) | RDD 레코드는 타입을 가지고 있다. RDD[long],RDD[int,String]등등 |
Partitioned | 레코드들은 논리적인 파티션으로 분할되어 있고 클러스터의 여러노드에 분산저장되어 있다. |
- RDD는 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
- RDD는 각 record가 로우라는 개념이아닌, 객체
- 모든값을 다루거나 값 사이의 상호작용 과정을 반드시 수동으로 정의해야 하는 RDD
- 동일한 공간 효율성과 성능을 얻으려면 객체에 포맷 타입을 구현해 모든 RDD연산 과정에서 사용해야 함
- 레코드의 내부 구조를 스파크에서 파악할 수 없다.
- DataFrame
- Dataframe의 각 레코드는 스키마를 알고 있는 필드로 구성된 구조화된 로우
- 자동으로 데이터를 최적화하고 압축된 바이너리 포맷으로 저장
- 필터 재정렬, 집계같은 최적화 기법도 자동으로 수행
RDD In Scala vs Python
- RDD in Scala
- 스칼라와 자바는 대부분 비슷한 성능이지만, 원형 객체를 다룰 때는 큰 성능 손실이 발생
- RDD in Python
- RDD를 파이썬으로 실행하는 것은 파이썬으로 만들어진 사용자 정의 함수를 사용해 로우마다 적용하는것과 동일
- 직렬화과정을 거친 데이터를 파이썬 프로세스에 전달하고, 파이썬 처리가 끝나면 다시 직렬화하여 자바 가상 머신에 반환. 파이썬을 이용한 RDD는 높은 오버헤드가 발생한다.
Dataframe대신에 RDD는 언제 사용하면 좋을까
- RDD는 고수준 API에서 제공하지 않는 기능이 필요한 경우(scala,java에 해당) python은 성능 손실이 크다.
ex)
클러스터의 물리적 데이터의 배치를 아주 세밀하게 제어해야 하는 상황 - RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
- 사용자 정의 공유 변수를 다뤄야 하는 경우(broadcast 변수, accumulator)
- 정말 필요한 경우가아니면 RDD 수동 생성 x
- RDD는 구조적 API(Dataframe)이 제공하는 여러 최적화 기법을 사용할 수 없다.(Dataframe은 효율적, 안정적, 표현력이 좋다)
- 물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝)에 세부적인 제어가 필요할 때 RDD를 사용
RDD 생성하는 다양한 방법, RDD를 DF로 만드는 방법
1. SparkContext를 통해 생성하는방법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
conf = SparkConf()
conf.setMaster("local[*]").setAppName("PySpark")
sc = SparkContext(conf=conf)
#1 range를 통해 RDD생성
rdd1=sc.parallelize(range(500))
print(type(rdd1))
#2 row를 통해 RDD생성
from pyspark.sql import Row
data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"),
Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
rdd2=sc.parallelize(data)
#3 string을 split시켜 list를 만들고 RDD생성
myCollection= "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
#parallelize의 2번째 인자는 partition 수를 지정할 때 쓰임
words = sc.parallelize(myCollection, 5)
#4 직접 list를 만들어 RDD생성
list1=['a','ds','ewrew','wq','b']
listrdd=sc.parallelize(list1)
|
2. SparkSession을 통해 RDD만드는 방법
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import findspark
findspark.init()
from pyspark.sql import SparkSession
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
#df를 생성하여 RDD로 변환
df_sample= spark.range(100)
df_sample_toRDD=df_sample.rdd
#sparksession에서 sparkContext를 불러 바로 RDD생성
df_from_sparksession=spark.sparkContext.parallelize(range(100))
|
3. RDD를 DF로 바꾸는 방법
RDD를 DF로 바꾸기 위해선 가장 중요한 것이 RDD와 DF의 차이인 Row로 만들어진 RDD여야 한다는 것
RDD는 record가 Row가 아닌 그냥 객체의 개념이므로 튜플로만들어진 RDD,list로 만들어진 RDD, Row로 만들어진 RDD 모두 가능하지만, DF가 되기위해선 Row로 만들어진 RDD여야 한다.
1) dataframe을 통해 만들어진 rdd를 다시 DF로 변환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
#df를 생성하여 RDD로 변환된 RDD를 다시 df로 바꿀 때(toDF())
#spark.range(100)로 만들어진 DF의 각 로우는 Row로써 표현된다.
df_sample= spark.range(100)
#df를 rdd로 변환시키더라도 각 로우는 일반 객체가아닌 Row로써 구성되어 있다.
df_sample_toRDD=df_sample.rdd
#df로부터 만들어진 rdd는 Row로써 구성되어 있어 toDF()를 통하여 df로 만들 수 있다.
df_sampe_toRDD_toDF=df_sample_toRDD.toDF()
|
cs |
2) SparkSession의 createDataframe RDD 객체를 생성하고 그 객체를 DF로 변환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import Row,StructField,StructType,IntegerType
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
#공백을 기준으로 글자들을 list로 반환한다.
myCollection= "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
#sparksession의 sparkcontext를 이용하여 Row로 구성된 RDD를 생성한다.
rdd1 = spark.sparkContext.parallelize(Row(myCollection))
#만들어진 Row로 구성된 RDD를 DF로 변환한다.(schema가 없는 df가 생성됨)
df_rdd1 = spark.createDataFrame(rdd1)
#sparksession의 sparkContext를 이용하여 Row로 구성된 RDD를 생성한다.
rdd2 = spark.sparkContext.parallelize(Row((1,2,3,4,5,6)))
#df로 변환시 적용할 schema를 정의한다.
schema = StructType([
StructField("col1",IntegerType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",IntegerType(),True),
StructField("col4",IntegerType(),True),
StructField("col5",IntegerType(),True),
StructField("col6",IntegerType(),True),
])
df_rdd2 = spark.createDataFrame(rdd2,schema)
|
cs |
728x90
댓글