Apache SparkSQL과 Dataframe


SparkSQL과 Dataframe에 대한 포스팅입니다!

SparkSQL


  • SQL을 원하는 이유
    • 익숙한 언어인 SQL로 데이터를 분석하고 싶어서
    • 맵리듀스보다 빠르고 편함
    • 정형화된 데이터의 경우 높은 최적화를 구현 가능
  • Hive의 경쟁자
  • Spark 2.0은 Spark SQL을 위한 업데이트
    • 더 많은 쿼리와 파일포맷 지원 강화
  • 현재 Low-level API인 RDD와 공존, 앞으로 Dataset API쪽으로도 무게가 실릴 수도!
  • 머신러닝은 정형화된 데이터셋을 주로 다루기 때문에 Dataframe API로 다시 쓰여짐
  • SparkSQL Programming Guide

Spark Dataset, Dataframe API


  • SparkSQL은 구조화된 데이터가 필요
  • Dataset[Row] = Dataframe
  • Json 파일에서 스키마를 읽어 Dataframe을 만듬
  • RDD -> DS, DF로 변환 가능
  • MLlib에서도 구조화된 데이터를 다루고 언어별 통일성 등의 장점을 취하기 위해 Dataframe 사용
  • Dataset(Dataframe) Document
  • Functions : 들어가서 함수 Check! agg 안에 넣을 수 있는 함수들입니다
  • Dataset은 여러 개의 row를 가질 수 있는 가상의 리스트
    • Dataset to represent a DataFrame

Dataframe 실습


val df = spark.read.option("header", "true").csv("character-deaths.csv")

df.show(10, true) // true : 내용이 길면 ...화 되는 것! false로 하면 풀네임이 나옴
// API 문서찾을 때 Dataset을 찾아보면 됩니다!!


df.count

// TempView 생성
df.createOrReplaceTempView("game_of_throne")


%sql
select *
from game_of_throne

%sql
select Allegiances, count(1)
from game_of_throne
group by Allegiances


// sql로 표현한 것을 df에서 진행하면 아래와 같음
df.groupBy("Allegiances").count.show

df.groupBy("Allegiances").agg(count("Name")).show



df.groupBy("Allegiances").agg(count("Name"), collect_list("Name")).show(false)


df.groupBy("Allegiances").agg(count($"Name"), sum("Gender"), sum("Nobility")).show(false)



%sql
select *
from game_of_throne
// %sql은 제플린에서만 지원되는 기능
// 제플린이 아니라면 spark.sql("SELECT * FROM game_of_throne")


// 바차트를 선택한 후, settings를 클릭해서 key와 그룹을 조절할 수 있음

// error 발생
df.filter("Allegiances" == "Baratheon").show 

// ===를 사용, 앞의 $"Allegiances" : column name이 됨
df.filter($"Allegiances" === "Baratheon").show

// Death Year 역순 정렬
df.filter($"Allegiances" === "Baratheon").orderBy($"Death Year".desc).show


// 특정 가문의 성별 count
df.filter($"Allegiances" === "Baratheon").groupBy("Gender").count.show


// select
df.filter($"Allegiances" === "Baratheon").select($"Name", $"Death Year").show

// 파일 쓰기
val df2 = df.filter($"Allegiances" === "Baratheon").select($"Name", $"Death Year")

df2.write.csv("df2")


// 하둡에 저장하는 것을 기본으로 해서, 읽기 쓰기는 쉽지만 삭제 및 수정이 어려움
// mode("overwrite")로 가능
df2.write.mode("overwrite").option("header", "true").csv("df2")

df2.write.mode("overwrite").option("header", "true").json("df2")


// colum 2개를 삭제하고 Gender2를 복사, Gender가 1이냐 아니냐로 나눔
df.drop("GoT", "CoK").withColumn("Gender2", $"Gender" ===1).show

// 성별을 남녀로 하고싶다면 UDF를 사용해야 합니다
// UDF 정의
// SQL용
sqlContext.udf.register("genderString", (gender: int => if (gender ==1) "Male" else "Female")

// Dataframe용 : 2개가 나뉨.. 과도기라서 둘 다 알아야 합니다
val genderString = udf((gender: Int) => if (gender ==1) "Male" else "Female")

// UDF를 사용해서 성별을 Male, Female로 나옴
df.drop("GoT", "CoK").withColumn("Gender2", genderString($"Gender")).show


// join
// https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.Dataset 참고
val character_deaths = spark.read.option("header", "true").csv("/Users/byeon/Downloads/character-deaths.csv")
val character_predictions = spark.read.option("header", "true").csv("/Users/byeon/Downloads/character-predictions.csv")

character_deaths.join(character_predictions, "name").show

RDD 복습


val rdd = sc.textFile("/Users/byeon/Downloads/character-deaths.csv").map(t => t.split(",")).map {
    a => (a(0), a(1), a(2))
}

rdd.take(5).foreach(println)
// Tuple로 관리하는 것은 조금 번거로워서 case class를 만듬

case class Character(name: String, allegiances: String, deathYear: Option[Int], isDeath: Boolean)

val rdd = sc.textFile("/Users/byeon/Downloads/character-deaths.csv").map(t => t.split(",")).filter(a => a(0) != "Name").map {
    a => 
    val deathYear = if (a(2) !="") Some(a(2).toInt) else None
    val isDeath = !deathYear.isEmpty
    Character (a(0), a(1), deathYear, isDeath)
}
// rdd를 사용하려면 파싱을 해줘야 함. dataframe은 부가작업이 존재하지 않음

rdd.filter(_.allegiances =="Lannister").take(10).foreach(println)

// RDD To Dataset Dataframe
rdd.toDS
rdd.toDF("Name", "Allegiances", "DeathYear", "IsDeath")

Dataset 실습


  • Dataframe과 RDD의 중간 느낌
case class Character2(Name: String, Allegiances: String)
val ds = character_deaths.as[Character2]
// character_deaths의 character2를 가지고 있는 Dataset

// dataframe에선 filter 하려면
// untyped api
ds.filter($"Name" === "Addam Marbrand").show

// dataset에선 
// typed api
ds.filter(_.Name == "Addam Marbrand").show

// 에러 발생(Character2에서 정의를 안했기 때문에)
ds.filter(_.GoT == "Addam Marbrand").show

// 이런 방식으로 사용 가능
ds.filter(ds("Name") === "Addam Marbrand").show
// join을 했을 때 어느 컬럼인지 헷갈릴 수 있음. 이 경우 사용!

// Dataset to RDD
ds.rdd.map {
	_.Name.length
}.collect


카일스쿨 유튜브 채널을 만들었습니다. 데이터 사이언스, 성장, 리더십, BigQuery 등을 이야기할 예정이니, 관심 있으시면 구독 부탁드립니다 :)

PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다

이 글이 도움이 되셨거나 다양한 의견이 있다면 댓글 부탁드립니다 :)

Buy me a coffeeBuy me a coffee





© 2017. by Seongyun Byeon

Powered by zzsza