Apache Spark RDD, Dataframe을 DB(MySQL, PostgreSQL)에 저장하기


Apaceh Spark RDD, Dataframe을 MySQL, PostgreSQL에 저장하는 방법에 대해 작성한 글입니다. Mac 환경에서 작업했으며, Spark Version은 2.3.0입니다

작업 순서

  • RDD 생성
  • RDD to Dataframe
  • Dataframe to DB(MySQL, PostgreSQL)

필요한 Driver 설정

  • build.sbt에 libraryDependencies에 추가

    "mysql" % "mysql-connector-java" % "5.1.12",
    "postgresql" % "postgresql" % "9.1-901.jdbc4"
    

RDD to DB

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val appName = "rddSaveDatabase"
val master = "local[*]"
val conf = new SparkConf().setAppName(appName).setMaster(master)

val sc = SparkSession.builder
    .master(master)
    .appName(appName)
    .config("spark.some.config.option", "config-value")
    .getOrCreate()
    
val values = List("20180705", 1.0)

// Row 생성
val row = Row.fromSeq(values)

// SparkSession은 2.x 이후 엔트리 포인트로, 내부에 sparkContext를 가지고 있음
val rdd = sc.sparkContext.makeRDD(List(row))
  
val fields = List(
StructField("First Column", StringType, nullable = false),
StructField("Second Column", DoubleType, nullable = false)
)

val dataFrame = sc.createDataFrame(rdd, StructType(fields))

val properties = new Properties()
properties.put("user", "mysql_username")
properties.put("password", "your_mysql_password")

// to MySQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/dbtest", "test", properties)
// to PostgreSQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:postgresql://localhost:5432/dbtest", "test", properties)

println("RDD Save to DB!")    
  • SaveMode.Append : DB에 추가
  • SaveMode.Overwrite : DB에 덮어쓰기

Dataframe to DB

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val appName = "dataframeSaveDatabase"
val master = "local[*]"
val conf = new SparkConf().setAppName(appName).setMaster(master)

val sc = SparkSession.builder
    .master(master)
    .appName(appName)
    .config("spark.some.config.option", "config-value")
    .getOrCreate()

import sc.implicits._    

val values = List(("zzsza", "2018-07-05", "2017-07-06"))
val dataFrame = values.toDF("user_id", "join_date", "event_date")

val properties = new Properties()
properties.put("user", "database_username")
properties.put("password", "your_database_password")

// to MySQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/dbtest", "test", properties)
// to PostgreSQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:postgresql://localhost:5432/dbtest", "test", properties)

println("Dataframe Save to DB!")    

Reference


카일스쿨 유튜브 채널을 만들었습니다. 데이터 분석, 커리어에 대한 내용을 공유드릴 예정입니다.

PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다

이 글이 도움이 되셨거나 의견이 있으시면 댓글 남겨주셔요.

Buy me a coffeeBuy me a coffee





© 2017. by Seongyun Byeon

Powered by zzsza