Terima kasih kepada Pengyu Wang, pengembang perangkat lunak di FINRA, atas izin untuk menerbitkan ulang pos ini.
Tabel Salted Apache HBase dengan pra-pemisahan adalah solusi HBase yang terbukti efektif untuk menyediakan distribusi beban kerja yang seragam di seluruh RegionServers dan mencegah hot spot selama penulisan massal. Dalam desain ini, kunci baris dibuat dengan kunci logis ditambah garam di awal. Salah satu cara menghasilkan garam adalah dengan menghitung n (jumlah wilayah) modulo pada kode hash dari kunci baris logis (tanggal, dll).
Tombol Baris Pengasinan
Misalnya, tabel yang menerima pemuatan data setiap hari mungkin menggunakan kunci baris logis yang dimulai dengan tanggal, dan kami ingin membagi tabel ini menjadi 1.000 wilayah sebelumnya. Dalam hal ini, kami berharap dapat menghasilkan 1.000 garam yang berbeda. Garam dapat dibuat, misalnya, sebagai:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey = 2015-04-26|abc rowKey = 893|2015-04-26|abc
Keluaran dari hashCode()
dengan modulo memberikan keacakan untuk nilai garam dari "000" hingga "999". Dengan transformasi kunci ini, tabel dipecah terlebih dahulu pada batas garam saat dibuat. Ini akan membuat volume baris terdistribusi secara merata saat memuat HFiles dengan MapReduce bulkload. Ini menjamin bahwa kunci baris dengan garam yang sama jatuh ke wilayah yang sama.
Dalam banyak kasus penggunaan, seperti pengarsipan data, Anda perlu memindai atau menyalin data melalui rentang kunci logis tertentu (rentang tanggal) menggunakan tugas MapReduce. Tabel standar pekerjaan MapReduce disiapkan dengan menyediakan Scan
instance dengan atribut rentang kunci.
Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); scan.setStartRow(Bytes.toBytes("2015-04-26")); scan.setStopRow(Bytes.toBytes("2015-04-27")); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, TableInputFormat.class ); …
Namun, pengaturan pekerjaan seperti itu menjadi tantangan untuk tabel pra-pemisahan yang diasinkan. Tombol baris mulai dan berhenti akan berbeda untuk setiap wilayah karena masing-masing memiliki garam yang unik. Dan kami tidak dapat menentukan beberapa rentang ke satu Scan
contoh.
Untuk mengatasi masalah ini, kita perlu melihat bagaimana tabel MapReduce bekerja. Umumnya, kerangka kerja MapReduce membuat satu tugas peta untuk membaca dan memproses setiap pemisahan input. Setiap pemisahan dibuat dalam InputFormat
basis kelas, dengan metode getSplits()
.
Dalam pekerjaan HBase table MapReduce, TableInputFormat
digunakan sebagai InputFormat
. Di dalam implementasi, getSplits()
metode diganti untuk mengambil kunci baris mulai dan berhenti dari Scan
contoh. Saat kunci baris mulai dan berhenti menjangkau beberapa wilayah, rentang dibagi dengan batas wilayah dan mengembalikan daftar TableSplit
objek yang mencakup rentang kunci pindai. Alih-alih didasarkan pada blok HDFS, TableSplit
s didasarkan pada wilayah. Dengan menimpa getSplits()
metode, kami dapat mengontrol TableSplit
.
Membangun TabelInputFormat Kustom
Untuk mengubah perilaku getSplits()
metode, kelas khusus yang memperluas TableInputFormat
Dibutuhkan. Tujuan getSplits()
di sini adalah untuk menutupi rentang kunci logis di setiap wilayah, membangun rentang kunci baris mereka dengan garam unik mereka. Kelas HTable menyediakan metode getStartEndKeys()
yang mengembalikan kunci baris awal dan akhir untuk setiap wilayah. Dari setiap tombol mulai, urai garam yang sesuai untuk wilayah tersebut.
Pair keys = table.getStartEndKeys(); for (int i = 0; i < keys.getFirst().length; i++) { // The first 3 bytes is the salt, for the first region, start key is empty, so apply “000” if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } … }
Konfigurasi Pekerjaan Melewati Rentang Kunci Logis
TableInputFormat
mengambil kunci mulai dan berhenti dari Scan
contoh. Karena kami tidak dapat menggunakan Scan
dalam pekerjaan MapReduce kami, kami dapat menggunakan Configuration
alih-alih melewati dua variabel ini dan hanya kunci mulai dan berhenti logis yang cukup baik (variabel dapat berupa tanggal atau informasi bisnis lainnya). getSplits()
metode memiliki JobContext
argumen, Instance konfigurasi dapat dibaca sebagai context.getConfiguration()
.
Di driver MapReduce:
Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27");
Dalam Custom TableInputFormat
:
@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); String scanStart = conf.get("logical.scan.start"); String scanStop = conf.get("logical.scan.stop"); … }
Rekonstruksi Rentang Kunci Asin berdasarkan Wilayah
Sekarang setelah kita memiliki salt dan kunci start/stop logis untuk setiap region, kita dapat membangun kembali rentang kunci baris yang sebenarnya.
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);
Membuat TableSplit untuk Setiap Wilayah
Dengan rentang kunci baris, sekarang kita dapat menginisialisasi TableSplit
contoh untuk wilayah tersebut.
List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { … byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); }
Satu hal lagi yang harus dilihat adalah lokalitas data. Kerangka kerja menggunakan informasi lokasi di setiap pemisahan input untuk menetapkan tugas peta di host lokalnya. Untuk TableInputFormat
kami , kami menggunakan metode getTableRegionLocation()
untuk mengambil lokasi region yang menyajikan kunci baris.
Lokasi ini kemudian diteruskan ke TableSplit
konstruktor. Ini akan memastikan bahwa pemeta yang memproses pemisahan tabel berada di server wilayah yang sama. Satu metode, disebut DNS.reverseDns()
, memerlukan alamat untuk server nama HBase. Atribut ini disimpan dalam konfigurasi “hbase.nameserver.address
“.
this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null); … public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException { HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress(); InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); } catch (NamingException e) { regionLocation = regionServerAddress.getHostname(); } return regionLocation; } protected String reverseDNS(InetAddress ipAddress) throws NamingException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; }
Kode lengkap getSplits
akan terlihat seperti ini:
@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); table = getHTable(conf); if (table == null) { throw new IOException("No table was provided."); } // Get the name server address and the default value is null. this.nameServer = conf.get("hbase.nameserver.address", null); String scanStart = conf.get("region.scan.start"); String scanStop = conf.get("region.scan.stop"); Pair keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new RuntimeException("At least one region is expected"); } List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]); String regionSalt = null; if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); } log.info("Total table splits: " + splits.size()); return splits; }
Gunakan Custom TableInoutFormat di Driver MapReduce
Sekarang kita perlu mengganti TableInputFormat
dengan build kustom yang kami gunakan untuk tabel penyiapan pekerjaan MapReduce.
Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); HTableInterface status_table = new HTable(conf, status_tablename); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27"); Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, MultiRangeTableInputFormat.class );
Pendekatan kustom TableInputFormat
menyediakan kemampuan pemindaian yang efisien dan terukur untuk tabel HBase yang dirancang untuk menggunakan garam untuk beban data yang seimbang. Karena pemindaian dapat melewati kunci baris apa pun yang tidak terkait, terlepas dari seberapa besar tabelnya, kompleksitas pemindaian hanya terbatas pada ukuran data target. Dalam sebagian besar kasus penggunaan, ini dapat menjamin waktu pemrosesan yang relatif konsisten seiring bertambahnya tabel.