Apache Spark RDD NotSerializableException


Apache Spark RDD NotSerializableException 오류에 대한 포스팅입니다!

예제


  • MySQL에 Insert하는 사례
class Mysql() {
    def createConnection() {}
    def insert(n: Int) {}
}

val mysql = new Mysql()
mysql.createConnection

val rdd = sc.makeRDD(List(1,2,3,4))

rdd.foreach {
    n => mysql.insert(n)
}
  • rdd.foreach는 워커에서 실행되는 작업!
  • rdd.map(f)와 같은 경우 f라는 함수 코드가 클러스터에 전송되서 사용됩니다
  • 따라서 f는 다른 클러스터에 전송 가능한 형태여야 사용 가능
  • 이것이 안될 경우 NotSerializableException 발생

해법


rdd.foreachPartition {
    iter =>
    val mysql = new Mysql()
    mysql.createConnection
    iter.foreach {
        n => mysql.insert(n)
    }
}	
  • foreachPartition을 사용해 해결!

카일스쿨 유튜브 채널을 만들었습니다. 데이터 사이언스, 성장, 리더십, BigQuery 등을 이야기할 예정이니, 관심 있으시면 구독 부탁드립니다 :)

PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다

이 글이 도움이 되셨거나 다양한 의견이 있다면 댓글 부탁드립니다 :)

Buy me a coffeeBuy me a coffee





© 2017. by Seongyun Byeon

Powered by zzsza