본문 바로가기
BigData/Spark & Spark Tuning

[Spark] spark dataframe -> RDB로 적재하기

by 스파이디웹 2023. 7. 10.
728x90

오랜만에 글을 씁니다.

이직하고 나서 적응하느라 글쓸 기회가 없었는데, 그동안 어떤 공부를 해야될지 그리고 어떤 글부터 쓸지 순서를 정하느라 뜸했었습니다.

 

우선은 일을하면서 생긴 이슈와 겪은 업무들 위주로 정리하려고 합니다.

 


1. 배경

API 서빙을 위해 parquet파일을 RDB로 적재시켜야 했고, 실시간 API서빙이 가능했어야 하는 상황이였습니다.

 

따라서 spark을 이용하여 truncate없이 RDB로 적재시켜야 했습니다.

 

1) test db, table 생성

create table if not exists sparktordb.rdbtable(
id int(10),rdbtable
name varchar(30),
create_dt timestamp,
update_dt timestamp,
PRIMARY KEY(id)
);


2. Spark Dataframe overwrite + jdbc

처음에 고려해본 방법입니다.

INSERT,UPDATE 그리고 삭제된 데이터까지 다 반영이 가능한 overwrite하는 기능

import org.apache.spark.sql.SparkSession
import java.util.Properties

object ScalaSpark{

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("ScalaTest").master("local[*]").getOrCreate()
    val df_export = spark.read.format("csv").option("header","true").load("C:/spark/export.csv")

    val properties = new Properties()
    properties.put("user", "root")
    properties.put("password", "root")
    //df_export.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/sparktordb","rdbtable",properties)
    spark.stop()
    }
}

RDB에 insert된 dataframe의 데이터들
schema가 default인 text로 생성


[테이블의 schema를 유지한채로 truncate insert 하는 코드]

import org.apache.spark.sql.SparkSession
import java.util.Properties

object ScalaSpark{

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("ScalaTest").master("local[*]").getOrCreate()
    val df_export = spark.read.format("csv").option("header","true").load("C:/spark/export.csv")

    val properties = new Properties()
    properties.put("user", "root")
    properties.put("password", "root")
    //df_export.write.mode("overwrite").option("truncate","true").jdbc("jdbc:mysql://localhost:3306/sparktordb","rdbtable",properties)
    spark.stop()
    }
}

schema가 최초 생성할 때와 같은채로 유지

하지만 실시간으로 API를 제공해야되는 상황에서 truncate되는 시간동안 API를 제공할 수 없으므로, 해당방법은 사용하지못했습니다.


3. spark dataframe foreachPartition

그래서 foreachPartition + java DriverManager + pstmt로 직접 UPSERT 쿼리를 날리기로했습니다.

하지만 이 또한 delete가 반영되지 않는 문제는 있었지만, 데이터가 delete되지 않는다는 가정하에 이 방법으로 채택하였습니다.

이로써 실시간으로 API를 서빙할 수 있는 조건과 UPDATE, INSERT는 만족하게 되었습니다.

(cdc 없이 delete 된 데이터를 반영하는 방법을 알고 계신분은 댓글부탁드립니다.)

import org.apache.spark.sql.SparkSession

import java.sql.PreparedStatement
import java.util.Properties
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession, Dataset}

object ScalaSpark{

  val properties = new Properties()
  properties.put("user", "root")
  properties.put("password", "root")

  def export_to_mysql(): Unit = {
    val spark = SparkSession.builder.appName("ScalaTest").master("local[*]").getOrCreate()
    import spark.implicits._
    val df_export = spark.read.format("csv").option("header","true").load("C:/spark/export.csv")

    val upsert_df =
    df_export.foreachPartition {iterator: Iterator[Row] => 
      val dbc: Connection = DriverManager.getConnection("localhost:3306/sparktordb", "root", "root")
      val query = s"INSERT INTO rdbtable VALUES(?, ?, ?, ?) ON DUPLICATE KEY UPDATE id = VALUE(id), name = VALUE(name), create_dt = VALUE(create_dt), update_dt = VALUE(update_dt)"
      val pstmt: PreparedStatement = dbc.prepareStatement(query)

      try{
        while (iterator.hasNext) {
          val row = iterator.next()
          val id = row.getInt(row.fieldIndex("id"))
          val name = row.getString(row.fieldIndex("name"))
          val create_dt = row.getTimestamp(row.fieldIndex("create_dt"))
          val update_dt = row.getTimestamp(row.fieldIndex("update_dt"))

          pstmt.setInt(1, id)
          pstmt.setString(2, name)
          pstmt.setTimestamp(3, create_dt)
          pstmt.setTimestamp(4, update_dt)
          pstmt.addBatch()
        }
        pstmt.executeBatch()
      } finally {
        pstmt.close()
        dbc.close()
      }
    }

  }

  def main(args: Array[String]): Unit = {
    export_to_mysql()
    }
}

4. 남겨진 이슈

1) SparkSession을 매 함수마다 정의해야 하는 문제

SparkSession을 매 함수마다 정의해야 하는 문제, Zeppelin같은 notebook으로는 전역변수가 먹히지만, obect 안에 정의된 함수로는 SparkSession이 전역변수로 먹히지 않아서, 함수마다 Sparksession을 정의해줘야 했다.

(왜 그런지 아시는 분은 댓글 달아주시길 바랍니다.)

 

2) DELETE된 데이터를 반영시키는 방법

결국 실시간으로 데이터를 제공하면서, DELETE된 데이터를 반영시킬 방법이 계속 고민거리입니다. 해당방법을 SPARK가 아닌 다른 방법으로 가능한 방법이 있을지 더 찾아봐야겠습니다.

728x90

댓글