728x90
출처 : A comparison between RDD, DataFrame and Dataset in Spark from a developer’s point of view
RDD
- Resilient: 분산되어 있는 데이터에 에러가 생겨도 복구할 수 있는 능력
- Distributed: 클러스터의 여러 노드에 데이터를 분산해서 저장
- Dataset: 분산된 데이터의 모음
SparkContext를 통해 만들어지며 보통 sc=spark.SparkContext를 통해 객체를 생성한다
SparkSession을 import해도 SparkContext가 포함되어 있는 내용이라 SparkSession만 import해도 사용가능하긴 하다.
ex)
spark = SparkSession.builder.appName("이름").master("local[*]").getOrCreate()
sc = spark.SparkContext
lines = sc.textfile("주소".파일형식) #lines는 RDD타입으로 만들어지게 된다.
RDD를 사용하는 3가지 상황
- 높은 레벨의 API에서 찾을 수 없는 어떠한 기능이 필요할 때 사용(mysql의 upsert와 같은 기능),(map함수 내지 특정 기능사용)
ex) 클러스터를 통해 물리적 데이터선정을 빡빡하게 제어해야될 때) - RDD를 기반으로 짠 기존 코드를 유지해야 할 때
- shared variable 조작을 설정해야될 때.
RDD 특징
- Java, Scala의 객체를 처리하는 방식
- 함수를 1) Transformation 2) Action으로 나눠 Action에 해당하는 함수를 호출할 때 실행된다.
- transformation의 결과는 RDD로 생성
- 내부에 데이터 타입이 명시
- 쿼리 최적화등을 지원하지 않았음(카탈리스트 옵티마이저 X)
- Pyspark에서 UDF선언시 인터프리터와 JVM사이 커뮤니케이션으로 속도가 저하됨( Python의 경우 Scala의 2배), 반드시 built in 함수만 쓸 것
- 병렬적으로 처리한다
- immutable(수정할 수 없다)
- lineage를 통한 fault tolerent를 보장함으로써 속도가 빠르다.
- Transformation:
RDD에서 새로운 RDD를 생성하는 함수,
스파크의 동작중에서 데이터를 처리하는 명령
ex)
map, filter, flatMap, join - Action:
RDD에서 RDD가 아닌 타입의 data로 변환하는 함수,
Transformation의 결과를 저장하는 명령 - action이 일어날 때, transformation함수가 memory에 비로소 올라오게 된다.
ex)
count, collect, reduce, save
RDD의 개념
www.slideshare.net/yongho/rdd-paper-review
DataFrame
- SparkSession을 통해 만들어지는 데이터 타입
- 대부분의 경우에 dataframe을 사용(테이블 생성을 통한 SQL질의도 가능하기 때문)
spark = SparkSession.builder.appName("이름").master("local[*]").getOrCreate()
df = spark.read.format("형식").option("옵션","true").load("주소.파일형식")
#df는 dataframe타입으로 만들어지게 된다.
dataframe 특징
- 내부 데이터가 Row라는것만 명시, 실제 데이터 타입은 알 수 없음
- 스키마 추상화
- Python Wrapper코드로 Python에서의 성능이 향상되었다.
- Dataset 제네릭 객체 집합에 대한 별칭
- RDD와 마찬가지로 Pyspark에서 함수를 선언해서 사용할 경우(UDF), 속도 저하의 원인이 될 수 있다.
- 카탈리스트 옵티마이저 지원
- 테이블과 같이 구성되어 있기 때문에, SparkSQL을 통해 사용할 수 있다(RDBMS에 익숙한 사람들의 경우 편리함)
- RDB처럼 테이블을 가질 수 있으며, 테이블에 대한 연산이 가능하다.
- Named Column으로 구성
- immutable(수정될 수 없다.)
- lineage를 통해 lazy하게 작동한다.(계산 혹은 기능에 대한 계보를 저장하고 바로 실행되지는 않는다.즉 결과를 바로 돌려주지않는다) -> transformation과 action의 분할
#Dataframe은 python pandas 에 쓰이는 dataframe과 유사하지만 완전히 같지는 않다.
Dataset
사용 되는 경우
- 데이터프레임 조작을 통해 원하는 수행을 할 수 없을 때.
- type-saftey를 원할때, type-safety를 대가로 그만한 비용(자원)을 감당할 수 있을 때
(dataframe은 dataset의row타입이다.)
파이썬은 dataset을 지원하지 않는대신 from pyspark.sql import Row를 통해 대부분 지원을 한다.
dataset 특징
- 데이터타입이 명시되어야함
- 스키마 추상화
- Scala/Java에서 정의한 클래스에 의해 타입을 명시해야하는 JVM객체. 따라서 Python은 지원하지 않음.
- 카탈리스트 옵티마이저 지원
- Type-safe
- RDD와 Dataframe의 장점을 취한다.
SparkSQL
- dataset에서 실행되는 SQL쿼리를 이용하는 구조화된 데이터 처리 모듈이며, ANSI SQL, HIVEQL 문법을 따라 감(자체 개발된 SQL parser)
- 데이터베이스에 생성된 뷰나 테이블에 SQL쿼리를 실행할 수 있음
- 기본적으로 내장된 데이터베이스를 사용할 수 있으며(DB명: default), Glue, HiveMetastore와 연동하여 해당 테이블정보를 쿼리할 수 있음
- spark.sql.hive.metastore.version
- spark.sql.hive.metastore.jars
- spark.sql.hive.metastore.sharedPrefixes
IAM Key값이 저장된 Secret을 불러와 Glue Catalog와 연동하는 방법
import org.apache.spark.sql.SparkSession
import com.amazonaws.services.secretsmanager._
import com.amazonaws.services.secretsmanager.model._
import scala.util.parsing.json.JSON
object GlueTableQueryWithSecretApp {
def main(args: Array[String]): Unit = {
// AWS Secrets Manager에서 IAM Key 불러오기
def getSecret(secretName: String, region: String): Map[String, String] = {
val client = AWSSecretsManagerClientBuilder.standard()
.withRegion(region)
.build()
val request = new GetSecretValueRequest().withSecretId(secretName)
val response = client.getSecretValue(request)
val secretString = response.getSecretString
JSON.parseFull(secretString).get.asInstanceOf[Map[String, String]]
}
// Secret Manager에서 IAM 정보 가져오기
val secretName = "your-secret-name"
val region = "ap-northeast-2"
val secrets = getSecret(secretName, region)
val accessKeyId = secrets("aws_access_key_id")
val secretAccessKey = secrets("aws_secret_access_key")
// SparkSession 생성
val spark = SparkSession.builder()
.appName("GlueTableQuery")
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://your-glue-bucket/")
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.hadoop.fs.s3a.access.key", accessKeyId) // IAM 설정
.config("spark.hadoop.fs.s3a.secret.key", secretAccessKey)
.config("spark.hadoop.fs.s3a.endpoint", s"s3.$region.amazonaws.com")
.config("spark.sql.defaultCatalog", "glue_catalog")
.enableHiveSupport()
.getOrCreate()
// Glue 테이블 쿼리
val databaseName = "your_database_name"
val tableName = "your_table_name"
val df = spark.sql(s"SELECT * FROM glue_catalog.$databaseName.$tableName")
df.show(10)
spark.stop()
}
}
- SparkSQL은 Dataframe과 Dataset API에 통합되어 있음 → 데이터 변환 시 SQL과 Dataframe의 기능을 모두 사용할 수 있으며, 두 방식 모두 동일한 실행 코드로 컴파일 됨
Spark SQL Thrift JDBC/ODBC server
스파크는 자바 데이터베이스 연결(JDBC) 인터페이스를 제공
사용자나 원격 프로그램은 spark sql을 실행하기 위해 이 인터페이스로 spark driver에 접속(tableau가 대표적인 예시)
thrift JDBC/ODBC서버는 하이브 1.2.1 버전의 HiveServer2에 맞추어 구현되어 있
아파치 쓰리프트는 페이스북에서 서로 다른 언어간의 통신을 위하여 개발되었다. 원격 프로시저 호출(Remote Procedure Call)로 언어에 상관 없이 서로 통신할 수 있도록 도와준다.
예를 들면 PHP에서 작성한 기능을 파이썬과 Go언어에서 자유롭게 호출해서 사용할 수 있다. 단순히 하나의 함수 호출이 아니라, REST API 서버처럼 자유롭게 개발을 할 수 있다.
thrift와 호환되는 언어들은 모두 비슷하지만 각자 다른 인터페이스 정의 규칙을 가지고 있다.
아파치 쓰리프트에서는 .thrift 파일에 변수의 타입과 이름 그리고 함수의 매개변수, 반환값과 예외 등의 정의를 하고, thrift를 사용하여 옵션으로 변환하려는 언어를 적어주면 .thrift 파일에 입력된 정의로 각 언어에 맞게 코드를 생성해준다.
참조:
728x90
댓글