MongoDB
 sql >> Teknologi Basis Data >  >> NoSQL >> MongoDB

Sink Kafka Stream ke MongoDB menggunakan PySpark Structured Streaming

Saya menemukan solusi. Karena saya tidak dapat menemukan driver Mongo yang tepat untuk Streaming Terstruktur, saya mengerjakan solusi lain. Sekarang, saya menggunakan koneksi langsung ke mongoDb, dan menggunakan "foreach(...)" alih-alih foreachbatch(. ..). Kode saya terlihat seperti ini di file testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Di Jongo, bagaimana menemukan banyak dokumen dari Mongodb dengan daftar ID

  2. driver nodejs mongodb memutuskan koneksi saat idle

  3. Luwak:findOneAndUpdate tidak memperbarui bidang yang ada

  4. data pegas mongodb memetakan bidang dinamis

  5. Penyatuan koneksi di Spring Boot dan mongo db