Apache Spark RDD, Dataframe을 DB(MySQL, PostgreSQL)에 저장하기
in Data on Engineering
Apaceh Spark RDD, Dataframe을 MySQL, PostgreSQL에 저장하는 방법에 대해 작성한 글입니다. Mac 환경에서 작업했으며, Spark Version은 2.3.0입니다
작업 순서
- RDD 생성
- RDD to Dataframe
- Dataframe to DB(MySQL, PostgreSQL)
필요한 Driver 설정
build.sbt
에 libraryDependencies에 추가"mysql" % "mysql-connector-java" % "5.1.12", "postgresql" % "postgresql" % "9.1-901.jdbc4"
RDD to DB
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val appName = "rddSaveDatabase"
val master = "local[*]"
val conf = new SparkConf().setAppName(appName).setMaster(master)
val sc = SparkSession.builder
.master(master)
.appName(appName)
.config("spark.some.config.option", "config-value")
.getOrCreate()
val values = List("20180705", 1.0)
// Row 생성
val row = Row.fromSeq(values)
// SparkSession은 2.x 이후 엔트리 포인트로, 내부에 sparkContext를 가지고 있음
val rdd = sc.sparkContext.makeRDD(List(row))
val fields = List(
StructField("First Column", StringType, nullable = false),
StructField("Second Column", DoubleType, nullable = false)
)
val dataFrame = sc.createDataFrame(rdd, StructType(fields))
val properties = new Properties()
properties.put("user", "mysql_username")
properties.put("password", "your_mysql_password")
// to MySQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/dbtest", "test", properties)
// to PostgreSQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:postgresql://localhost:5432/dbtest", "test", properties)
println("RDD Save to DB!")
SaveMode.Append
: DB에 추가SaveMode.Overwrite
: DB에 덮어쓰기
Dataframe to DB
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.util.Properties
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val appName = "dataframeSaveDatabase"
val master = "local[*]"
val conf = new SparkConf().setAppName(appName).setMaster(master)
val sc = SparkSession.builder
.master(master)
.appName(appName)
.config("spark.some.config.option", "config-value")
.getOrCreate()
import sc.implicits._
val values = List(("zzsza", "2018-07-05", "2017-07-06"))
val dataFrame = values.toDF("user_id", "join_date", "event_date")
val properties = new Properties()
properties.put("user", "database_username")
properties.put("password", "your_database_password")
// to MySQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/dbtest", "test", properties)
// to PostgreSQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:postgresql://localhost:5432/dbtest", "test", properties)
println("Dataframe Save to DB!")
Reference
카일스쿨 유튜브 채널을 만들었습니다. 데이터 분석, 커리어에 대한 내용을 공유드릴 예정입니다.
PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다
이 글이 도움이 되셨거나 의견이 있으시면 댓글 남겨주셔요.