Apache SparkSQL과 Dataframe
in Data on Engineering
SparkSQL과 Dataframe에 대한 포스팅입니다!
SparkSQL
- SQL을 원하는 이유
- 익숙한 언어인 SQL로 데이터를 분석하고 싶어서
- 맵리듀스보다 빠르고 편함
- 정형화된 데이터의 경우 높은 최적화를 구현 가능
- Hive의 경쟁자
- Spark 2.0은 Spark SQL을 위한 업데이트
- 더 많은 쿼리와 파일포맷 지원 강화
- 현재 Low-level API인 RDD와 공존, 앞으로 Dataset API쪽으로도 무게가 실릴 수도!
- 머신러닝은 정형화된 데이터셋을 주로 다루기 때문에 Dataframe API로 다시 쓰여짐
- SparkSQL Programming Guide
Spark Dataset, Dataframe API
- SparkSQL은 구조화된 데이터가 필요
Dataset[Row] = Dataframe
- Json 파일에서 스키마를 읽어 Dataframe을 만듬
- RDD -> DS, DF로 변환 가능
- MLlib에서도 구조화된 데이터를 다루고 언어별 통일성 등의 장점을 취하기 위해 Dataframe 사용
- Dataset(Dataframe) Document
- Functions : 들어가서 함수 Check! agg 안에 넣을 수 있는 함수들입니다
- Dataset은 여러 개의 row를 가질 수 있는 가상의 리스트
- Dataset
to represent a DataFrame
- Dataset
Dataframe 실습
val df = spark.read.option("header", "true").csv("character-deaths.csv")
df.show(10, true) // true : 내용이 길면 ...화 되는 것! false로 하면 풀네임이 나옴
// API 문서찾을 때 Dataset을 찾아보면 됩니다!!
df.count
// TempView 생성
df.createOrReplaceTempView("game_of_throne")
%sql
select *
from game_of_throne
%sql
select Allegiances, count(1)
from game_of_throne
group by Allegiances
// sql로 표현한 것을 df에서 진행하면 아래와 같음
df.groupBy("Allegiances").count.show
df.groupBy("Allegiances").agg(count("Name")).show
df.groupBy("Allegiances").agg(count("Name"), collect_list("Name")).show(false)
df.groupBy("Allegiances").agg(count($"Name"), sum("Gender"), sum("Nobility")).show(false)
%sql
select *
from game_of_throne
// %sql은 제플린에서만 지원되는 기능
// 제플린이 아니라면 spark.sql("SELECT * FROM game_of_throne")
// 바차트를 선택한 후, settings를 클릭해서 key와 그룹을 조절할 수 있음
// error 발생
df.filter("Allegiances" == "Baratheon").show
// ===를 사용, 앞의 $"Allegiances" : column name이 됨
df.filter($"Allegiances" === "Baratheon").show
// Death Year 역순 정렬
df.filter($"Allegiances" === "Baratheon").orderBy($"Death Year".desc).show
// 특정 가문의 성별 count
df.filter($"Allegiances" === "Baratheon").groupBy("Gender").count.show
// select
df.filter($"Allegiances" === "Baratheon").select($"Name", $"Death Year").show
// 파일 쓰기
val df2 = df.filter($"Allegiances" === "Baratheon").select($"Name", $"Death Year")
df2.write.csv("df2")
// 하둡에 저장하는 것을 기본으로 해서, 읽기 쓰기는 쉽지만 삭제 및 수정이 어려움
// mode("overwrite")로 가능
df2.write.mode("overwrite").option("header", "true").csv("df2")
df2.write.mode("overwrite").option("header", "true").json("df2")
// colum 2개를 삭제하고 Gender2를 복사, Gender가 1이냐 아니냐로 나눔
df.drop("GoT", "CoK").withColumn("Gender2", $"Gender" ===1).show
// 성별을 남녀로 하고싶다면 UDF를 사용해야 합니다
// UDF 정의
// SQL용
sqlContext.udf.register("genderString", (gender: int => if (gender ==1) "Male" else "Female")
// Dataframe용 : 2개가 나뉨.. 과도기라서 둘 다 알아야 합니다
val genderString = udf((gender: Int) => if (gender ==1) "Male" else "Female")
// UDF를 사용해서 성별을 Male, Female로 나옴
df.drop("GoT", "CoK").withColumn("Gender2", genderString($"Gender")).show
// join
// https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.Dataset 참고
val character_deaths = spark.read.option("header", "true").csv("/Users/byeon/Downloads/character-deaths.csv")
val character_predictions = spark.read.option("header", "true").csv("/Users/byeon/Downloads/character-predictions.csv")
character_deaths.join(character_predictions, "name").show
RDD 복습
val rdd = sc.textFile("/Users/byeon/Downloads/character-deaths.csv").map(t => t.split(",")).map {
a => (a(0), a(1), a(2))
}
rdd.take(5).foreach(println)
// Tuple로 관리하는 것은 조금 번거로워서 case class를 만듬
case class Character(name: String, allegiances: String, deathYear: Option[Int], isDeath: Boolean)
val rdd = sc.textFile("/Users/byeon/Downloads/character-deaths.csv").map(t => t.split(",")).filter(a => a(0) != "Name").map {
a =>
val deathYear = if (a(2) !="") Some(a(2).toInt) else None
val isDeath = !deathYear.isEmpty
Character (a(0), a(1), deathYear, isDeath)
}
// rdd를 사용하려면 파싱을 해줘야 함. dataframe은 부가작업이 존재하지 않음
rdd.filter(_.allegiances =="Lannister").take(10).foreach(println)
// RDD To Dataset Dataframe
rdd.toDS
rdd.toDF("Name", "Allegiances", "DeathYear", "IsDeath")
Dataset 실습
- Dataframe과 RDD의 중간 느낌
case class Character2(Name: String, Allegiances: String)
val ds = character_deaths.as[Character2]
// character_deaths의 character2를 가지고 있는 Dataset
// dataframe에선 filter 하려면
// untyped api
ds.filter($"Name" === "Addam Marbrand").show
// dataset에선
// typed api
ds.filter(_.Name == "Addam Marbrand").show
// 에러 발생(Character2에서 정의를 안했기 때문에)
ds.filter(_.GoT == "Addam Marbrand").show
// 이런 방식으로 사용 가능
ds.filter(ds("Name") === "Addam Marbrand").show
// join을 했을 때 어느 컬럼인지 헷갈릴 수 있음. 이 경우 사용!
// Dataset to RDD
ds.rdd.map {
_.Name.length
}.collect
카일스쿨 유튜브 채널을 만들었습니다. 데이터 분석, 커리어에 대한 내용을 공유드릴 예정입니다.
PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다
이 글이 도움이 되셨거나 의견이 있으시면 댓글 남겨주셔요.