Apache Spark Streaming
in Data on Engineering
Apache Spark Streaming에 대한 글입니다
Spark Streaming
- 다양한 소스로부터 실시간 스트리밍 데이터 처리
- Spark RDD와 사용 방법이 유사하며 lambda 아키텍쳐를 만들기 좋음
- Structured Streaming라는 것도 최근 추가됨 : 공식 문서
- 스트림 데이터를 일정 단위로 쪼개어 batch 처리
- DStream (discreatized stream 불연속적 스트림)
- 데이터를 끊어서 연속된 RDD로 만들어 처리
- 데이터를 아주 짧은 주기로 처리 (ex: 1초마다 처리)
- SparkConf
- 여러가지 설정을 저장
- AppName과 master 주소
- StreamingContext
- 소스 (DStream, RDD) 생성과 스트리밍 처리 시작, 종료 등을 수행
- Input DStream
- Input data를 표현하는 DStream, Receiver와 연동
- Receiver
- 건별로 들어오는 데이터를 모아서 처리할 수 있도록 처리하는 친구
- 데이터를 받아서 Spark 메모리에 저장해놓음
DStream Operations
- Transformations
- RDD와 거의 유사
- Window Operations
- 최근 1분(주기)의 평균을 1초마다 refresh하고 싶을 경우!
- Time Window 개념을 제공
- window length
- Output Operations
- foreachRDD : 자유도가 높음
- saveAsTextFile, saveAsObjectFile, saveAsHadoopFile 등
Example
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Seconds(1) : 1초 주기
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
// start를 해야 시작!
ssc.awaitTermination() // Wait for the computation to terminate
Streaming
Twitter API
- Twitter 가입
- 전화번호 인증
- https://apps.twitter.com/ 에서 앱 만들기
- Keys and Access Tokens 클릭
- Your Access Token에서 키 생성
- 다음 값 챙기기
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
build.sbt 설정
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.11.7",
version := "0.1.0-SNAPSHOT"
)),
name := "Hello",
libraryDependencies ++= List(
"org.twitter4j" % "twitter4j-core" % "4.0.6",
"org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.1.0",
"org.apache.spark" % "spark-core_2.11" % "2.2.0",
"org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
),
retrieveManaged := true
)
twitterStreaming File
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
object twitterStreaming extends App{
println("hello")
val appName = "spark_course"
val master = "local[*]"
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
val consumerKey = "****"
val consumerSecret = "****"
val accessToken = "****"
val accessTokenSecret = "****"
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val stream = TwitterUtils.createStream(ssc, None)
// stream.print()
// val text = stream.map(status => status.getText) // 아래처럼 리팩토링
val text = stream.map(_.getText)
val hashTags = text.flatMap(_.split(" ")).filter(_.startsWith("#"))
// val hashTagCounts = hashTags.map(h => (h, 1)).reduceByKey(_+_)
val hashTagCounts = hashTags.map(h => (h, 1)).reduceByKeyAndWindow(_+_, Seconds(60)) // Window 사용 : 60초의 데이터를 모음
// text.print()
// hashTags.print()
// hashTagCounts.print() // 순서대로 정렬은 안됨..! foreachRDD를 사용하자
hashTagCounts.foreachRDD {
rdd =>
println("=====")
rdd.sortBy(_._2, false).take(10).foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
- TwitterUtils 살펴보기! TwitterInputDStream, getReceiver 함수들 확인해보기
- TwitterReceiver의 onstart()할 때 store(status)도 확인해보기!
- Streaming같은 경우 처리를 여러 컴퓨터로 하면 날라갈 수 있음!
- 정확하게 1번만 처리하는 것이 어려운 이슈
- 실시간 처리할 때 정확도를 조금 포기하는편
- 그래서 정확한 수치가 필요한 것은 배치로 처리!
- Kafka가 Spark로 쏘고, S3로 쏘기도 함 (람다 아키텍쳐)
Performance Tuning
- 스트리밍은 정해진 batch 주기 이내에 데이터 처리가 모두 끝나야해서 성능이 중요
- 병렬처리, 데이터 Serialization 방법, 메모리, GC 튜닝 등 수많은 요소를 점검하고 손봐야 합니다
- Tuning Guide 및 Streaming 문서 참고
카일스쿨 유튜브 채널을 만들었습니다. 데이터 분석, 커리어에 대한 내용을 공유드릴 예정입니다.
PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다
이 글이 도움이 되셨거나 의견이 있으시면 댓글 남겨주셔요.