Di Apache Kafka, aplikasi Java yang disebut produsen menulis pesan terstruktur ke cluster Kafka (terdiri dari broker). Demikian pula, aplikasi Java yang disebut konsumen membaca pesan-pesan ini dari cluster yang sama. Di beberapa organisasi, ada kelompok berbeda yang bertugas menulis dan mengelola produsen dan konsumen. Dalam kasus seperti itu, satu titik masalah utama adalah koordinasi format pesan yang disepakati antara produsen dan konsumen.
Contoh ini menunjukkan cara menggunakan Apache Avro untuk membuat serial catatan yang diproduksi ke Apache Kafka sambil memungkinkan evolusi skema dan pembaruan nonsinkron dari aplikasi produsen dan konsumen.
Serialisasi dan Deserialisasi
Catatan Kafka (sebelumnya disebut pesan) terdiri dari kunci, nilai, dan header. Kafka tidak mengetahui struktur data dalam kunci dan nilai record. Ini menangani mereka sebagai array byte. Tetapi sistem yang membaca catatan dari Kafka benar-benar peduli dengan data dalam catatan tersebut. Jadi, Anda perlu menghasilkan data dalam format yang dapat dibaca. Format data yang Anda gunakan harus
- Bersikaplah kompak
- Cepat untuk menyandikan dan mendekode
- Izinkan evolusi
- Izinkan sistem upstream (yang menulis ke cluster Kafka) dan sistem downstream (yang membaca dari cluster Kafka yang sama) untuk mengupgrade ke skema yang lebih baru pada waktu yang berbeda
JSON, misalnya, cukup jelas tetapi bukan format data yang ringkas dan lambat untuk diuraikan. Avro adalah kerangka serialisasi cepat yang menghasilkan keluaran yang relatif ringkas. Tetapi untuk membaca catatan Avro, Anda memerlukan skema yang digunakan untuk membuat serial data.
Salah satu opsi adalah menyimpan dan mentransfer skema dengan catatan itu sendiri. Ini baik-baik saja dalam file tempat Anda menyimpan skema sekali dan menggunakannya untuk sejumlah besar catatan. Menyimpan skema di setiap catatan Kafka, bagaimanapun, menambahkan overhead yang signifikan dalam hal ruang penyimpanan dan pemanfaatan jaringan. Opsi lainnya adalah memiliki seperangkat pemetaan pengidentifikasi-skema yang disepakati dan merujuk ke skema dengan pengidentifikasinya dalam catatan.
Dari Object ke Kafka Record dan Kembali
Aplikasi produser tidak perlu mengonversi data secara langsung ke array byte. KafkaProducer adalah kelas generik yang membutuhkan penggunanya untuk menentukan tipe kunci dan nilai. Kemudian, produsen menerima instance ProducerRecord
yang memiliki tipe parameter yang sama. Konversi dari objek ke array byte dilakukan oleh Serializer. Kafka menyediakan beberapa serializer primitif:misalnya, IntegerSerializer
, ByteArraySerializer
, StringSerializer
. Di sisi konsumen, Deserializer serupa mengonversi array byte ke objek yang dapat ditangani aplikasi.
Jadi masuk akal untuk terhubung di tingkat Serializer dan Deserializer dan memungkinkan pengembang aplikasi produsen dan konsumen untuk menggunakan antarmuka yang nyaman yang disediakan oleh Kafka. Meskipun versi terbaru Kafka mengizinkan ExtendedSerializers
dan ExtendedDeserializers
untuk mengakses header, kami memutuskan untuk menyertakan pengidentifikasi skema di kunci dan nilai catatan Kafka alih-alih menambahkan header catatan.
Esensi Avro
Avro adalah kerangka kerja serialisasi data (dan panggilan prosedur jarak jauh). Ini menggunakan dokumen JSON yang disebut skema untuk menggambarkan struktur data. Sebagian besar penggunaan Avro adalah melalui GenericRecord atau subclass dari SpecificRecord. Kelas Java yang dihasilkan dari skema Avro adalah subkelas dari yang terakhir, sedangkan yang pertama dapat digunakan tanpa pengetahuan sebelumnya tentang struktur data yang digunakan.
Ketika dua skema memenuhi seperangkat aturan kompatibilitas, data yang ditulis dengan satu skema (disebut skema penulis) dapat dibaca seolah-olah ditulis dengan skema lainnya (disebut skema pembaca). Skema memiliki bentuk kanonik yang memiliki semua detail yang tidak relevan untuk serialisasi, seperti komentar, dihilangkan untuk membantu pemeriksaan kesetaraan.
VersionedSchema dan SchemaProvider
Seperti disebutkan sebelumnya, kita memerlukan pemetaan satu-ke-satu antara skema dan pengidentifikasinya. Terkadang lebih mudah untuk merujuk ke skema dengan nama. Ketika skema yang kompatibel dibuat, itu dapat dianggap sebagai versi skema berikutnya. Jadi kita bisa merujuk ke skema dengan nama, pasangan versi. Mari kita sebut skema, pengidentifikasinya, nama, dan versinya sebagai VersionedSchema
. Objek ini mungkin menyimpan metadata tambahan yang dibutuhkan aplikasi.
public class VersionedSchema { private final int id; private final String name; private final int version; private final Schema schema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id = id; this.name = name; this.version = version; this.schema = schema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return schema; } public int getId() { return id; } }
SchemaProvider
objek dapat mencari contoh VersionedSchema
.
public interface SchemaProvider extends AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema); }
Bagaimana antarmuka ini diterapkan tercakup dalam "Menerapkan Toko Skema" di entri blog mendatang.
Menserialisasikan Data Umum
Saat membuat serial catatan, pertama-tama kita perlu mencari tahu Skema mana yang akan digunakan. Setiap record memiliki getSchema
metode. Tetapi mencari tahu pengidentifikasi dari skema mungkin memakan waktu. Biasanya lebih efisien untuk mengatur skema pada waktu inisialisasi. Ini dapat dilakukan secara langsung dengan pengidentifikasi atau dengan nama dan versi. Selanjutnya, ketika memproduksi untuk beberapa topik, kita mungkin ingin mengatur skema yang berbeda untuk topik yang berbeda dan mencari tahu skema dari nama topik yang diberikan sebagai parameter untuk metode serialize(T, String)
. Logika ini dihilangkan dalam contoh kami demi singkatnya dan kesederhanaan.
private VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema()); }
Dengan skema di tangan, kita perlu menyimpannya dalam pesan kita. Membuat serial ID sebagai bagian dari pesan memberi kami solusi ringkas, karena semua keajaiban terjadi di Serializer/Deserializer. Ini juga memungkinkan integrasi yang sangat mudah dengan kerangka kerja dan pustaka lain yang sudah mendukung Kafka dan memungkinkan pengguna menggunakan serializer mereka sendiri (seperti Spark).
Dengan menggunakan pendekatan ini, pertama-tama kita menulis pengidentifikasi skema pada empat byte pertama.
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os = new DataOutputStream(stream)) { os.writeInt(id); } }
Kemudian kita dapat membuat DatumWriter
dan membuat serial objek.
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema); datumWriter.write(data, encoder); encoder.flush(); }
Menyatukan semua ini, kami telah mengimplementasikan serializer data generik.
public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> { private SchemaProvider schemaProvider; @Override public void configure(Map<String, ?> configs, boolean isKey) { schemaProvider = SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { VersionedSchema schema = getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); return stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Could not serialize data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...} private VersionedSchema getSchema(T data, String topic) {...} @Override public void close() { try { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } } }
Deserialisasi Data Umum
Deserialisasi dapat bekerja dengan satu skema (data skema ditulis dengan) tetapi Anda dapat menentukan skema pembaca yang berbeda. Skema pembaca harus kompatibel dengan skema yang digunakan untuk membuat serial data, tetapi tidak harus setara. Untuk alasan ini, kami memperkenalkan nama skema. Kami sekarang dapat menentukan bahwa kami ingin membaca data dengan versi skema tertentu. Pada saat inisialisasi, kami membaca versi skema yang diinginkan per nama skema dan menyimpan metadata di readerSchemasByName
untuk akses cepat. Sekarang kita dapat membaca setiap record yang ditulis dengan versi skema yang kompatibel seolah-olah ditulis dengan versi yang ditentukan.
@Override public void configure(Map<String, ?> configs, boolean isKey) { this.schemaProvider = SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider); }
Ketika sebuah record perlu di-deserialized, pertama-tama kita membaca identifier dari skema writer. Ini memungkinkan pencarian skema pembaca berdasarkan nama. Dengan kedua skema yang tersedia, kita dapat membuat GeneralDatumReader
dan baca catatannya.
@Override public GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); VersionedSchema readerSchema = readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord = readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); return avroRecord; } catch (IOException e) { throw new RuntimeException(e); } } private int readSchemaId(InputStream stream ) throws IOException { try(DataInputStream is = new DataInputStream(stream)) { return is.readInt(); } } private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record = new GenericData.Record(readerSchema); datumReader.read(record, decoder); return record; }
Menangani SpecificRecords
Lebih sering daripada tidak ada satu kelas yang ingin kita gunakan untuk catatan kita. Kelas ini kemudian biasanya dihasilkan dari skema Avro. Apache Avro menyediakan alat untuk menghasilkan kode Java dari skema. Salah satu alat tersebut adalah plugin Avro Maven. Kelas yang dihasilkan memiliki skema tempat mereka dibuat tersedia saat runtime. Ini membuat serialisasi dan deserialisasi lebih sederhana dan lebih efektif. Untuk serialisasi, kita bisa menggunakan kelas untuk mencari tahu tentang pengidentifikasi skema yang akan digunakan.
@Override public void configure(Map<String, ?> configs, boolean isKey) { String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString(); try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) { Class<?> recordClass = Class.forName(className); Schema writerSchema = new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); } }
Dengan demikian kita tidak memerlukan logika untuk menentukan skema dari topik dan data. Kami menggunakan skema yang tersedia di kelas record untuk menulis record.
Demikian pula, untuk deserialisasi, skema pembaca dapat ditemukan dari kelas itu sendiri. Logika deserialisasi menjadi lebih sederhana, karena skema pembaca ditetapkan pada waktu konfigurasi dan tidak perlu mencari nama skema.
@Override public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); } } private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, decoder); }
Bacaan Tambahan
Untuk informasi lebih lanjut tentang kompatibilitas skema, lihat spesifikasi Avro untuk Resolusi Skema.
Untuk informasi lebih lanjut tentang formulir kanonik, lihat spesifikasi Avro untuk Parsing Formulir Kanonis untuk Skema.
Lain kali…
Bagian 2 akan menunjukkan implementasi sistem untuk menyimpan definisi skema Avro.