728x90
이직을 하게 되면서 python이 아닌, scala 언어로 Spark을 돌리고 있었습니다.
언젠가는 scala를 배워봐야지 생각만 했었는데, 생각보다 기회가 앞당겨진 것 같아, 요즘 열심히 scala 언어를 공부하고 있는 중입니다.
그래서 이번 포스트에는 scala언어로 spark앱을 만들고, jar파일로 만들어 보겠습니다.
1. 프로젝트 생성
스칼라 새 프로젝트 생성
build 시스템은 sbt로 지정
settings에서 scala plugins생성
.
├── build.sbt # <-- 1
├── project
│ ├── build.properties
│ ├── plugins.sbt # <-- 2 처음에는 없습니다!
│ ├── project
│ └── target
├── src
│ ├── main # <-- 3
│ └── test
└── target
├── global-logging
├── scala-2.12 # <-- 여기에 jar가 생성됩니다.
├── streams
└── task-temp-directory
build.properties
build.sbt
버전, 환경세팅 관련해서 여기에 정의
name := "MySparkApp" //프로젝트명
version := "0.1"
scalaVersion := "2.12.0"
val sparkVersion = "3.1.2" //현재 설치 버전
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
위의 버튼을 눌러 빌드
Test Spark Object만들기
import org.apache.spark.sql.SparkSession
object ScalaSpark{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("ScalaTest").master("local[*]").getOrCreate()
val df_ny_taxi = spark.read.format("csv").option("header", "true").load("C:/Users/jiho3/Downloads/Motor_Vehicle_Collisions_-_Crashes_update.csv")
df_ny_taxi.show()
}
}
실행 시켜줍니다.
Build 해서 Jar 파일 만들기
1. sbt다운로드
https://www.scala-sbt.org/download.html
위의 링크에서 build.properties에 명시된 sbt version으로 다운로드 받습니다.(저는 1.3.10)
2. assembly.sbt 작성
3. build.sbt 작성
name := "MySparkApp" //프로젝트명
version := "0.1"
scalaVersion := "2.12.15"
val sparkVersion = "3.1.2" //현재 설치 버전
assemblyOption in assembly := (assemblyOption in assembly).value.withIncludeScala(false)
assemblyJarName in assembly := name.value + "_" + version.value + ".jar"
assemblyMergeStrategy in assembly := {
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case PathList(pl @ _*) if pl.contains("log4j.properties") => MergeStrategy.concat
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case PathList("/module-info.class") => MergeStrategy.discard
case x if x.endsWith("modlue-info.class") => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
val sparkDependencies = Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
("org.apache.spark" %% "spark-core" % sparkVersion % "provided").
exclude("org.scala-lang", "scala-library")
)
libraryDependencies ++= sparkDependencies
4. EMR 에 제출할 spark 파일 작성하기
import org.apache.spark.sql.SparkSession
object ScalaSpark{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("ScalaTest").master("local[*]").getOrCreate()
val df_ny_taxi = spark.read.format("csv").option("header", "true").load("s3://ny-vehicle/source/Motor_Vehicle_Collisions_-_Crashes_update.csv")
df_ny_taxi.coalesce(1).write.mode("overwrite").format("parquet").save("s3://ny-vehicle/target/")
}
}
5. spark 파일 fat jar파일로 bulid하기
intellij 터미널에서 sbt assembly
728x90
'BigData > Spark & Spark Tuning' 카테고리의 다른 글
[Spark Tuning] Spark 3.x 버전 특징 정리 (0) | 2023.08.10 |
---|---|
[Spark] spark dataframe -> RDB로 적재하기 (2) | 2023.07.10 |
[Spark Tuning] CSV vs Parquet(columnar format) in spark 성능비교하기 (0) | 2023.01.25 |
[Spark Tuning] PartitionFilters vs PushedFilter 비교, predicate pushdown vs projection pushdown (0) | 2023.01.01 |
[Spark] PySpark read & write + partitioning 간단한 예시문제 (2) | 2022.08.23 |
댓글