Redis
 sql >> Teknologi Basis Data >  >> NoSQL >> Redis

Labu dengan Contoh – Menerapkan Antrian Tugas Redis

Bagian tutorial ini menjelaskan cara menerapkan antrean tugas Redis untuk menangani pemrosesan teks.

Pembaruan:

  • 12/02/2020:Ditingkatkan ke Python versi 3.8.1 serta versi terbaru Redis, Python Redis, dan RQ. Lihat di bawah untuk detailnya. Sebutkan bug di versi RQ terbaru dan berikan solusinya. Memecahkan bug http sebelum https.
  • 22/03/2016:Ditingkatkan ke Python versi 3.5.1 serta versi terbaru Redis, Python Redis, dan RQ. Lihat di bawah untuk detailnya.
  • 22/02/2015:Menambahkan dukungan Python 3.

Bonus Gratis: Klik di sini untuk mendapatkan akses ke video tutorial Flask + Python gratis yang menunjukkan cara membuat aplikasi web Flask, langkah demi langkah.

Ingat:Inilah yang sedang kami bangun—Aplikasi Flask yang menghitung pasangan frekuensi kata berdasarkan teks dari URL tertentu.

  1. Bagian Satu:Siapkan lingkungan pengembangan lokal, lalu terapkan lingkungan staging dan produksi di Heroku.
  2. Bagian Kedua:Menyiapkan database PostgreSQL bersama dengan SQLAlchemy dan Alembic untuk menangani migrasi.
  3. Bagian Tiga:Tambahkan logika back-end untuk mengikis lalu proses jumlah kata dari halaman web menggunakan perpustakaan permintaan, BeautifulSoup, dan Natural Language Toolkit (NLTK).
  4. Bagian Empat:Menerapkan antrian tugas Redis untuk menangani pemrosesan teks. (saat ini )
  5. Bagian Lima:Siapkan Angular di front-end untuk terus melakukan polling di back-end untuk melihat apakah permintaan sudah selesai diproses.
  6. Bagian Enam:Mendorong ke server staging di Heroku - menyiapkan Redis dan merinci cara menjalankan dua proses (web dan pekerja) pada satu Dyno.
  7. Bagian Tujuh:Perbarui front-end agar lebih ramah pengguna.
  8. Bagian Delapan:Buat Arahan Sudut khusus untuk menampilkan bagan distribusi frekuensi menggunakan JavaScript dan D3.

Butuh kode? Ambil dari repo.


Persyaratan Pemasangan

Alat yang digunakan:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - pustaka sederhana untuk membuat antrian tugas

Mulailah dengan mengunduh dan menginstal Redis dari situs resmi atau melalui Homebrew (brew install redis ). Setelah terinstal, mulai server Redis:

$ redis-server

Selanjutnya instal Python Redis dan RQ di jendela terminal baru:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Menyiapkan Pekerja

Mari kita mulai dengan membuat proses pekerja untuk mendengarkan tugas yang antri. Buat file baru worker.py , dan tambahkan kode ini:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Di sini, kami mendengarkan antrian yang disebut default dan membuat koneksi ke server Redis di localhost:6379 .

Jalankan ini di jendela terminal lain:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Sekarang kita perlu memperbarui app.py untuk mengirim pekerjaan ke antrian…



Perbarui app.py

Tambahkan impor berikut ke app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Kemudian perbarui bagian konfigurasi:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) siapkan koneksi Redis dan inisialisasi antrean berdasarkan koneksi tersebut.

Pindahkan fungsionalitas pemrosesan teks dari rute indeks kami dan ke fungsi baru yang disebut count_and_save_words() . Fungsi ini menerima satu argumen, sebuah URL, yang akan kita berikan padanya saat kita memanggilnya dari rute indeks kita.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Perhatikan kode berikut:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Catatan: Kita perlu mengimpor count_and_save_words fungsi dalam fungsi index karena paket RQ saat ini memiliki bug, di mana ia tidak akan menemukan fungsi dalam modul yang sama.

Di sini kita menggunakan antrian yang kita inisialisasi sebelumnya dan disebut enqueue_call() fungsi. Ini menambahkan pekerjaan baru ke antrian dan pekerjaan itu menjalankan count_and_save_words() fungsi dengan URL sebagai argumen. result_ttl=5000 argumen baris memberi tahu RQ berapa lama untuk mempertahankan hasil pekerjaan selama - 5.000 detik, dalam kasus ini. Kemudian kami mengeluarkan id pekerjaan ke terminal. ID ini diperlukan untuk melihat apakah pekerjaan sudah selesai diproses.

Mari kita siapkan rute baru untuk itu…



Dapatkan Hasil

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Mari kita uji ini.

Jalankan server, navigasikan ke http://localhost:5000/, gunakan URL https://realpython.com, dan ambil id pekerjaan dari terminal. Kemudian gunakan id itu di titik akhir ‘/results/’ - yaitu, http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Selama kurang dari 5.000 detik telah berlalu sebelum Anda memeriksa status, maka Anda akan melihat nomor id, yang dihasilkan saat kami menambahkan hasilnya ke database:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Sekarang, mari kita sedikit memfaktorkan ulang rute untuk mengembalikan hasil aktual dari database di JSON:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Pastikan untuk menambahkan impor:

from flask import jsonify

Uji ini lagi. Jika semuanya berjalan dengan baik, Anda akan melihat sesuatu yang mirip dengan di browser Anda:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Apa Selanjutnya?

Bonus Gratis: Klik di sini untuk mendapatkan akses ke video tutorial Flask + Python gratis yang menunjukkan cara membuat aplikasi web Flask, langkah demi langkah.

Di Bagian 5 kita akan menyatukan klien dan server dengan menambahkan Angular ke dalam campuran untuk membuat poller, yang akan mengirim permintaan setiap lima detik ke /results/<job_key> titik akhir meminta pembaruan. Setelah data tersedia, kami akan menambahkannya ke DOM.

Semangat!

Ini adalah karya kolaborasi antara Cam Linke, salah satu pendiri Startup Edmonton, dan orang-orang di Real Python



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Frontend dan backend cache tunggal

  2. Menggunakan redis sebagai cache LRU untuk postgres

  3. phpMyAdmin setara dengan MySQL untuk Redis?

  4. Cara menggunakan perintah UNSUBSCRIBE di Redis 2.6.11

  5. Prototipe objek Node.JS hanya dapat berupa Objek atau nol dengan Redis