Menurut kesalahan, Anda sudah memiliki string, (Anda sudah melakukan df.selectExpr("CAST(value AS STRING)")
), jadi Anda harus mencoba mendapatkan acara Baris sebagai String
, dan bukan Array[Byte]
Mulailah dengan mengubah
val valueStr = new String(record.getAs[Array[Byte]]("value"))
ke
val valueStr = record.getAs[String]("value")
Saya mengerti Anda mungkin sudah memiliki cluster untuk menjalankan kode Spark, tetapi saya sarankan untuk tetap melihat Konektor Wastafel Mongo Kafka Connect sehingga Anda tidak perlu menulis dan memelihara penulis Mongo Anda sendiri dalam kode Spark.
Atau, Anda dapat menulis set data Spark ke mongo secara langsung juga