PostgreSQL
 sql >> Teknologi Basis Data >  >> RDS >> PostgreSQL

Antrian pekerjaan sebagai tabel SQL dengan banyak konsumen (PostgreSQL)

Saya menggunakan postgres untuk antrian FIFO juga. Saya awalnya menggunakan ACCESS EXCLUSIVE, yang menghasilkan hasil yang benar dalam konkurensi tinggi, tetapi memiliki efek yang tidak menguntungkan karena saling eksklusif dengan pg_dump, yang memperoleh kunci ACCESS SHARE selama eksekusinya. Ini menyebabkan fungsi next() saya terkunci untuk waktu yang sangat lama (durasi pg_dump). Ini tidak dapat diterima karena kami adalah toko 24x7 dan pelanggan tidak suka waktu mati di antrian di tengah malam.

Saya pikir pasti ada kunci yang tidak terlalu membatasi yang masih aman secara bersamaan dan tidak mengunci saat pg_dump sedang berjalan. Pencarian saya membawa saya ke pos SO ini.

Kemudian saya melakukan riset.

Mode berikut ini cukup untuk fungsi FIFO queue NEXT() yang akan memperbarui status pekerjaan dari queued untuk berlari tanpa gagal konkurensi, dan juga tidak memblokir pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Pertanyaan:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Hasilnya terlihat seperti:

UPDATE 1
 job_id
--------
     98
(1 row)

Berikut adalah skrip shell yang menguji semua mode kunci yang berbeda pada konkurensi tinggi (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Kode juga ada di sini jika Anda ingin mengedit:https://Gist.github.com/1083936

Saya memperbarui aplikasi saya untuk menggunakan mode EKSKLUSIF karena ini adalah mode paling ketat yang a) benar dan b) tidak bertentangan dengan pg_dump. Saya memilih yang paling ketat karena tampaknya paling tidak berisiko dalam hal mengubah aplikasi dari ACCESS EXCLUSIVE tanpa menjadi ahli uber dalam penguncian postgres.

Saya merasa cukup nyaman dengan alat uji saya dan dengan ide-ide umum di balik jawabannya. Saya harap berbagi ini membantu memecahkan masalah ini untuk orang lain.



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Uji fungsi null dengan parameter yang bervariasi

  2. Solusi Cloud PostgreSQL Terkelola Pembandingan - Bagian Empat:Microsoft Azure

  3. Caching di PostgreSQL

  4. Mengapa postgres tidak menggunakan indeks dalam kueri saya

  5. Bagaimana cara menyegarkan entitas JPA ketika basis data backend berubah secara tidak sinkron?