Saya bukan ahli mongodb, tetapi berdasarkan contoh yang saya lihat, ini adalah pola yang akan saya coba.
Saya telah menghilangkan peristiwa selain data, karena pembatasan itu tampaknya menjadi perhatian utama.
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
Saya mencoba untuk menguji aliran Rx ini tanpa mongodb, sementara ini mungkin memberi Anda beberapa ide.