Saya berada pada masalah yang sama tidak yakin apakah Anda menemukan solusi atau tidak, tetapi saya dapat mencapai sesuatu yang serupa dengan melakukan hal berikut. Pertama, saya menambahkan pemicu ke tabel saya
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Ini akan mengatur pemicu pada tabel setiap kali baris, diperbarui, dihapus, atau disisipkan. Kemudian akan memanggil fungsi pemicu yang telah saya atur yang terlihat seperti ini:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Ini akan memungkinkan saya untuk 'mendengarkan' salah satu pembaruan ini dari proyek boot musim semi saya dan itu akan mengirim seluruh baris sebagai muatan. Selanjutnya, dalam proyek boot musim semi saya, saya mengonfigurasi koneksi ke db saya.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
Dengan itu saya Autowire (injeksi ketergantungan) ke konstruktor di kelas layanan saya dan melemparkannya ke kelas PostgressqlConnection r2dbc seperti:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Sekarang kami ingin 'mendengarkan' tabel kami dan mendapatkan pemberitahuan saat melakukan beberapa pembaruan pada tabel kami. Untuk melakukannya, kami menyiapkan metode inisialisasi yang dilakukan setelah injeksi ketergantungan dengan menggunakan anotasi @PostContruct
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Perhatikan bahwa kita mendengarkan nama apa pun yang kita masukkan ke dalam metode pg_notify. Kami juga ingin menyiapkan metode untuk menutup koneksi saat kacang akan dibuang, seperti:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Sekarang saya cukup membuat metode yang mengembalikan Fluks dari apa pun yang saat ini ada di tabel saya, dan saya juga menggabungkannya dengan notifikasi saya, seperti yang saya katakan sebelum notifikasi masuk sebagai json, jadi saya harus membatalkan serialisasi dan saya memutuskan untuk menggunakan Pemeta objek. Jadi, akan terlihat seperti ini:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
Semoga ini bisa membantu. Cheers!