Ini belum dipublikasikan, tetapi di cabang master Alpakka, MongoSource.apply
mengambil parameter tipe:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Oleh karena itu, dengan rilis 0,18 Alpakka yang akan datang, Anda dapat melakukan hal berikut:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Perhatikan bahwa source
di sini mengasumsikan bahwa todoCollection.find()
mengembalikan Observable[TodoMongo]
; sesuaikan jenisnya sesuai kebutuhan.
Sementara itu, Anda cukup menambahkan kode di atas secara manual. Misalnya:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Perhatikan bahwa MyMongoSource
didefinisikan berada di akka.stream.alpakka.mongodb.scaladsl
paket (seperti MongoSource
), karena ObservableToPublisher
adalah kelas paket-pribadi. Anda akan menggunakan MyMongoSource
dengan cara yang sama seperti Anda menggunakan MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())