Masalah terpecahkan! Saya tidak percaya saya telah menghabiskan dua hari penuh untuk ini... Saya benar-benar melihat ke arah yang salah.
Masalahnya bukan pada beberapa konfigurasi jaringan Dataflow atau GCP, dan sejauh yang saya tahu...
benar.
Masalahnya tentu saja dalam kode saya:hanya masalahnya yang terungkap hanya di lingkungan terdistribusi. Saya telah membuat kesalahan dengan membuka terowongan dari prosesor pipa utama, bukan dari pekerja. Jadi terowongan SSH naik tetapi tidak antara pekerja dan server target, hanya antara pipa utama dan target!
Untuk memperbaikinya, saya harus mengubah permintaan DoFn saya untuk membungkus eksekusi kueri dengan terowongan :
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
seperti yang Anda lihat, saya harus mengganti beberapa bit perpustakaan pysql_beam.
Akhirnya, setiap pekerja membuka terowongannya sendiri untuk setiap permintaan. Mungkin saja untuk mengoptimalkan perilaku ini, tetapi itu sudah cukup untuk kebutuhan saya.