Redis
 sql >> Teknologi Basis Data >  >> NoSQL >> Redis

Bagaimana cara menerapkan aliran masa depan untuk panggilan pemblokiran menggunakan futures.rs dan Redis PubSub?

Peringatan berat Saya belum pernah menggunakan perpustakaan ini sebelumnya, dan pengetahuan tingkat rendah saya tentang beberapa konsep agak... kurang. Sebagian besar saya membaca tutorial. Saya cukup yakin siapa pun yang telah melakukan pekerjaan async akan membaca ini dan tertawa, tetapi ini mungkin menjadi titik awal yang berguna bagi orang lain. Pembebas peringatan!

Mari kita mulai dengan sesuatu yang sedikit lebih sederhana, menunjukkan bagaimana Stream bekerja. Kita dapat mengonversi iterator Result s ke dalam aliran:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Ini menunjukkan kepada kita satu cara untuk mengonsumsi arus. Kami menggunakan and_then untuk melakukan sesuatu pada setiap muatan (di sini hanya mencetaknya) dan kemudian for_each untuk mengonversi Stream kembali ke Future . Kami kemudian dapat menjalankan masa depan dengan memanggil forget yang anehnya bernama metode.

Berikutnya adalah mengikat perpustakaan Redis ke dalam campuran, menangani hanya satu pesan. Sejak get_message() metode memblokir, kita perlu memasukkan beberapa utas ke dalam campuran. Bukan ide yang baik untuk melakukan banyak pekerjaan dalam sistem asinkron jenis ini karena yang lainnya akan diblokir. Misalnya:

Kecuali jika diatur demikian, harus dipastikan bahwa implementasi fungsi ini selesai dengan sangat cepat .

Di dunia yang ideal, peti redis akan dibangun di atas perpustakaan seperti masa depan dan mengekspos semua ini secara asli.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Pemahaman saya semakin kabur di sini. Di utas terpisah, kami memblokir pesan dan mendorongnya ke saluran saat kami mendapatkannya. Apa yang saya tidak mengerti adalah mengapa kita harus memegang pegangan utas. Saya mengharapkan foo.forget akan memblokir dirinya sendiri, menunggu hingga alirannya kosong.

Dalam koneksi telnet ke server Redis, kirim ini:

publish rust awesome

Dan Anda akan melihatnya bekerja. Menambahkan pernyataan cetak menunjukkan bahwa (bagi saya) foo.forget pernyataan dijalankan sebelum utas muncul.

Banyak pesan lebih rumit. Sender mengkonsumsi dirinya sendiri untuk mencegah sisi pembangkit terlalu jauh di depan sisi konsumen. Ini dilakukan dengan mengembalikan masa depan lain dari send ! Kita perlu mengeluarkannya kembali dari sana untuk menggunakannya kembali untuk iterasi loop berikutnya:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Saya yakin akan ada lebih banyak ekosistem untuk jenis interoperasi ini seiring berjalannya waktu. Misalnya, peti futures-cpupool bisa mungkin diperluas untuk mendukung kasus penggunaan yang serupa dengan ini.




  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Konfigurasi Laravel Redis

  2. Bagaimana ServiceStack Redis berfungsi dalam mengambil data

  3. Kesalahan 99 menghubungkan ke localhost:6379. Tidak dapat menetapkan alamat yang diminta

  4. Server Pub/Sub Terkelola Redis

  5. Perintah Kustom Redis