PostgreSQL
 sql >> Teknologi Basis Data >  >> RDS >> PostgreSQL

Tunggu skrip Python menggunakan SQLAlchemy dan multiprocessing

Saya percaya TypeError berasal dari multiprocessing get .

Saya telah menghapus semua kode DB dari skrip Anda. Lihat ini:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Menggunakan r.wait mengembalikan hasil yang diharapkan, tetapi menggunakan r.get memunculkan TypeError . Seperti yang dijelaskan dalam dokumen python , gunakan r.wait setelah map_async .

Sunting :Saya harus mengubah jawaban saya sebelumnya. Saya sekarang percaya pada TypeError berasal dari SQLAlchemy. Saya telah mengubah skrip saya untuk mereproduksi kesalahan.

Edit 2 :Sepertinya masalahnya adalah multiprocessing.pool tidak berfungsi dengan baik jika ada pekerja yang memunculkan Pengecualian yang konstruktornya memerlukan parameter (lihat juga di sini ).

Saya telah mengubah skrip saya untuk menyoroti ini.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Dalam kasus Anda, mengingat kode Anda memunculkan pengecualian SQLAlchemy, satu-satunya solusi yang dapat saya pikirkan adalah menangkap semua pengecualian di do berfungsi dan menaikkan kembali Exception yang normal alih-alih. Sesuatu seperti ini:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Edit 3 :jadi, sepertinya ini adalah bug dengan Python , tetapi pengecualian yang tepat dalam SQLAlchemy akan mengatasinya:karenanya, saya telah mengangkat masalah dengan SQLAlchemy , juga.

Sebagai solusi masalah, saya pikir solusi di akhir Edit 2 akan dilakukan (membungkus panggilan balik dalam coba-kecuali dan menaikkan kembali).



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. django.db.utils.ProgrammingError:relasi app_user tidak ada selama uji manage.py

  2. Flask-SQLAlchemy db.session.query(Model) vs Model.query

  3. Menulis fungsi dalam SQL untuk mengulang rentang tanggal dalam UDF

  4. Bagaimana saya mendapatkan psycopg2 logging dari waktu eksekusi kueri?

  5. mengapa menulis dalam tabel mencegah vacuums di lain?