Saya percaya TypeError
berasal dari multiprocessing
get
.
Saya telah menghapus semua kode DB dari skrip Anda. Lihat ini:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Menggunakan r.wait
mengembalikan hasil yang diharapkan, tetapi menggunakan r.get
memunculkan TypeError
. Seperti yang dijelaskan dalam dokumen python
, gunakan r.wait
setelah map_async
.
Sunting :Saya harus mengubah jawaban saya sebelumnya. Saya sekarang percaya pada TypeError
berasal dari SQLAlchemy. Saya telah mengubah skrip saya untuk mereproduksi kesalahan.
Edit 2 :Sepertinya masalahnya adalah multiprocessing.pool
tidak berfungsi dengan baik jika ada pekerja yang memunculkan Pengecualian yang konstruktornya memerlukan parameter (lihat juga di sini
).
Saya telah mengubah skrip saya untuk menyoroti ini.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Dalam kasus Anda, mengingat kode Anda memunculkan pengecualian SQLAlchemy, satu-satunya solusi yang dapat saya pikirkan adalah menangkap semua pengecualian di do
berfungsi dan menaikkan kembali Exception
yang normal alih-alih. Sesuatu seperti ini:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Edit 3 :jadi, sepertinya ini adalah bug dengan Python , tetapi pengecualian yang tepat dalam SQLAlchemy akan mengatasinya:karenanya, saya telah mengangkat masalah dengan SQLAlchemy , juga.
Sebagai solusi masalah, saya pikir solusi di akhir Edit 2 akan dilakukan (membungkus panggilan balik dalam coba-kecuali dan menaikkan kembali).