EDIT 27-01-2018:
Ternyata masalah ini terkait dengan DirectRunner. Jika Anda menjalankan pipeline yang sama menggunakan DataflowRunner, Anda akan mendapatkan batch yang sebenarnya hingga 1.000 record. DirectRunner selalu membuat bundel ukuran 1 setelah operasi pengelompokan.
Jawaban asli:
Saya mengalami masalah yang sama saat menulis ke database cloud menggunakan JdbcIO Apache Beam. Masalahnya adalah bahwa sementara JdbcIO mendukung penulisan hingga 1.000 catatan dalam satu batch, saya belum pernah benar-benar melihatnya menulis lebih dari 1 baris sekaligus (harus saya akui:Ini selalu menggunakan DirectRunner dalam lingkungan pengembangan).
Karena itu saya telah menambahkan fitur ke JdbcIO di mana Anda dapat mengontrol ukuran kumpulan sendiri dengan mengelompokkan data Anda dan menulis setiap grup sebagai satu kumpulan. Di bawah ini adalah contoh cara menggunakan fitur ini berdasarkan contoh WordCount asli Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Bedanya dengan write-method biasa JdbcIO adalah metode barunya writeIterable()
yang membutuhkan PCollection<Iterable<RowT>>
sebagai input alih-alih PCollection<RowT>
. Setiap Iterable ditulis sebagai satu batch ke database.
Versi JdbcIO dengan tambahan ini dapat ditemukan di sini:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/Apache/beam/sdk/io/jdbc/JdbcIO.java
Seluruh contoh proyek yang berisi contoh di atas dapat ditemukan di sini:https://github.com/ olavloite/spanner-beam-example
(Ada juga permintaan tarik yang tertunda di Apache Beam untuk memasukkan ini ke dalam proyek)