Di Spark, fungsi pada RDD
s (seperti map
di sini) diserialisasi dan dikirim ke pelaksana untuk diproses. Ini menyiratkan bahwa semua elemen yang terkandung dalam operasi tersebut harus serializable.
Koneksi Redis di sini tidak dapat diserialisasi karena membuka koneksi TCP ke DB target yang terikat ke mesin tempat koneksi dibuat.
Solusinya adalah membuat koneksi tersebut pada eksekutor, dalam konteks eksekusi lokal. Ada beberapa cara untuk melakukannya. Dua yang muncul di pikiran adalah:
rdd.mapPartitions
:memungkinkan Anda memproses seluruh partisi sekaligus, dan karenanya mengamortisasi biaya pembuatan koneksi)- Manajer koneksi tunggal:Buat koneksi sekali per pelaksana
mapPartitions
lebih mudah karena yang dibutuhkan hanyalah sedikit perubahan pada struktur program:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Manajer koneksi tunggal dapat dimodelkan dengan objek yang menyimpan referensi malas ke koneksi (catatan:referensi yang dapat diubah juga akan berfungsi).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Objek ini kemudian dapat digunakan untuk membuat instance 1 koneksi per JVM pekerja dan digunakan sebagai Serializable
objek dalam penutupan operasi.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Keuntungan menggunakan objek tunggal adalah lebih sedikit overhead karena koneksi dibuat hanya sekali oleh JVM (sebagai lawan 1 per partisi RDD)
Ada juga beberapa kelemahan:
- pembersihan koneksi itu rumit (shutdown hook/timer)
- seseorang harus memastikan keamanan thread dari sumber daya bersama
(*) kode yang disediakan untuk tujuan ilustrasi. Tidak dikompilasi atau diuji.