HBase
 sql >> Teknologi Basis Data >  >> NoSQL >> HBase

Spark-on-HBase:konektor HBase berbasis DataFrame

Pos blog ini dipublikasikan di Hortonworks.com sebelum merger dengan Cloudera. Beberapa tautan, sumber daya, atau referensi mungkin tidak lagi akurat.

Kami dengan bangga mengumumkan pratinjau teknis Konektor Spark-HBase, yang dikembangkan oleh Hortonworks bekerja sama dengan Bloomberg.

Konektor Spark-HBase memanfaatkan API Sumber Data (SPARK-3247) yang diperkenalkan di Spark-1.2.0. Ini menjembatani kesenjangan antara penyimpanan Nilai Kunci HBase sederhana dan kueri SQL relasional yang kompleks dan memungkinkan pengguna untuk melakukan analisis data yang kompleks di atas HBase menggunakan Spark. HBase DataFrame adalah Spark DataFrame standar, dan mampu berinteraksi dengan sumber data lain seperti Hive, ORC, Parket, JSON, dll.

Latar Belakang

Ada beberapa konektor Spark HBase open source yang tersedia baik sebagai paket Spark, sebagai proyek independen atau di trunk HBase.

Spark telah pindah ke Dataset/DataFrame API, yang menyediakan pengoptimalan rencana kueri bawaan. Sekarang, pengguna akhir lebih suka menggunakan antarmuka berbasis DataFrames/Datasets.

Konektor HBase di batang HBase memiliki dukungan yang kaya di tingkat RDD, mis. BulkPut, dll, tetapi dukungan DataFrame-nya tidak begitu kaya. Konektor trunk HBase bergantung pada HadoopRDD standar dengan TableInputFormat bawaan HBase memiliki beberapa keterbatasan kinerja. Selain itu, BulkGet yang dilakukan di driver mungkin merupakan satu titik kegagalan.

Ada beberapa alternatif implementasi lainnya. Ambil Spark-SQL-on-HBase sebagai contoh. Ini menerapkan teknik pengoptimalan khusus yang sangat canggih dengan menyematkan rencana pengoptimalan kuerinya sendiri di dalam mesin Spark Catalyst standar, mengirimkan RDD ke HBase dan melakukan tugas-tugas rumit, seperti agregasi parsial, di dalam koprosesor HBase. Pendekatan ini mampu mencapai kinerja tinggi, tetapi sulit dipertahankan karena kompleksitasnya dan evolusi Spark yang cepat. Juga mengizinkan kode arbitrer untuk dijalankan di dalam koprosesor dapat menimbulkan risiko keamanan.

Konektor Spark-on-HBase (SHC) telah dikembangkan untuk mengatasi potensi kemacetan dan kelemahan ini. Ini mengimplementasikan API Spark Datasource standar, dan memanfaatkan mesin Spark Catalyst untuk pengoptimalan kueri. Secara paralel, RDD dibangun dari awal alih-alih menggunakan TableInputFormat untuk mencapai kinerja yang tinggi. Dengan RDD yang disesuaikan ini, semua teknik penting dapat diterapkan dan diimplementasikan sepenuhnya, seperti pemangkasan partisi, pemangkasan kolom, penurunan predikat, dan lokalitas data. Desainnya membuat perawatan menjadi sangat mudah, sekaligus mencapai keseimbangan yang baik antara kinerja dan kesederhanaan.

Arsitektur

Kami menganggap Spark dan HBase di-deploy dalam cluster yang sama, dan eksekutor Spark ditempatkan bersama dengan server region, seperti yang diilustrasikan pada gambar di bawah.

Gambar 1. Arsitektur Konektor Spark-on-HBase

Pada tingkat tinggi, konektor memperlakukan Pindai dan Dapatkan dengan cara yang sama, dan kedua tindakan dilakukan di pelaksana. Driver memproses kueri, menggabungkan pemindaian/mendapatkan berdasarkan metadata wilayah, dan menghasilkan tugas per wilayah. Tugas dikirim ke eksekutor pilihan yang berlokasi bersama dengan server wilayah, dan dilakukan secara paralel di eksekutor untuk mencapai lokalitas dan konkurensi data yang lebih baik. Jika suatu wilayah tidak menyimpan data yang diperlukan, server wilayah tersebut tidak diberi tugas apa pun. Tugas dapat terdiri dari beberapa Pemindaian dan BulkGets, dan permintaan data oleh tugas diambil hanya dari satu server wilayah, dan server wilayah ini juga akan menjadi preferensi lokalitas untuk tugas tersebut. Perhatikan bahwa driver tidak terlibat dalam pelaksanaan pekerjaan yang sebenarnya kecuali untuk tugas-tugas penjadwalan. Ini menghindari pengemudi menjadi penghambat.

Katalog Tabel

Untuk membawa tabel HBase sebagai tabel relasional ke dalam Spark, kami mendefinisikan pemetaan antara tabel HBase dan Spark, yang disebut Katalog Tabel. Ada dua bagian penting dari katalog ini. Salah satunya adalah definisi rowkey dan yang lainnya adalah pemetaan antara kolom tabel di Spark dan keluarga kolom dan kualifikasi kolom di HBase. Silakan merujuk ke bagian Penggunaan untuk detailnya.

Dukungan Avro Asli

Konektor mendukung format Avro secara asli, karena merupakan praktik yang sangat umum untuk menyimpan data terstruktur ke dalam HBase sebagai larik byte. Pengguna dapat menyimpan catatan Avro ke HBase secara langsung. Secara internal, skema Avro dikonversi ke tipe data Spark Catalyst asli secara otomatis. Perhatikan bahwa kedua bagian nilai kunci dalam tabel HBase dapat didefinisikan dalam format Avro. Silakan merujuk ke contoh/kasus uji di repo untuk penggunaan yang tepat.

Tekan Turun Predikat

Konektor hanya mengambil kolom yang diperlukan dari server wilayah untuk mengurangi overhead jaringan dan menghindari pemrosesan yang berlebihan di mesin Spark Catalyst. Filter HBase standar yang ada digunakan untuk melakukan push-down predikat tanpa memanfaatkan kemampuan koprosesor. Karena HBase tidak mengetahui tipe data kecuali larik byte, dan ketidakkonsistenan urutan antara tipe primitif Java dan larik byte,  kami harus memproses terlebih dahulu kondisi filter sebelum menyetel filter dalam operasi Pindai untuk menghindari kehilangan data. Di dalam server wilayah, catatan yang tidak cocok dengan kondisi kueri akan difilter.

Pemangkasan Partisi

Dengan mengekstrak kunci baris dari predikat, kami membagi Scan/BulkGet menjadi beberapa rentang yang tidak tumpang tindih, hanya server wilayah yang memiliki data yang diminta yang akan melakukan Scan/BulkGet. Saat ini, pemangkasan partisi dilakukan pada dimensi pertama dari kunci baris. Misalnya, jika kunci baris adalah “key1:key2:key3”, pemangkasan partisi hanya akan didasarkan pada “key1”. Perhatikan bahwa kondisi WHERE perlu didefinisikan dengan hati-hati. Jika tidak, pemangkasan partisi mungkin tidak berlaku. Misalnya, WHERE rowkey1> “abc” OR column =“xyz” (di mana rowkey1 adalah dimensi pertama dari rowkey, dan column adalah kolom hbase reguler) akan menghasilkan pemindaian penuh, karena kita harus mencakup semua rentang karena dari ATAU logika.

Lokalitas Data

Ketika pelaksana Spark ditempatkan bersama dengan server wilayah HBase, lokalitas data dicapai dengan mengidentifikasi lokasi server wilayah, dan melakukan upaya terbaik untuk menemukan tugas bersama dengan server wilayah. Setiap pelaksana melakukan Scan/BulkGet pada bagian data yang terletak bersama di host yang sama.

Pindai dan Dapatkan Massal

Kedua operator ini diekspos ke pengguna dengan menentukan WHERE CLAUSE, mis., WHERE kolom> x dan kolom untuk pemindaian dan kolom WHERE =x lupa. Operasi dilakukan di pelaksana, dan driver hanya membangun operasi ini. Secara internal mereka dikonversi untuk memindai dan/atau mendapatkan, dan Iterator[Row] dikembalikan ke mesin katalis untuk pemrosesan lapisan atas.

Penggunaan

Berikut ini mengilustrasikan prosedur dasar tentang cara menggunakan konektor. Untuk detail lebih lanjut dan kasus penggunaan lanjutan, seperti Avro dan dukungan kunci komposit, silakan merujuk ke contoh di repositori.

1) Tentukan katalog untuk pemetaan skema:

[code language="scala"]def catalog =s"""{        |"table":{"namespace":"default", "name":"table1"},       |"rowkey":"key" ,        |"columns":{         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},         |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"},          |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"},         |"col4":{"cf":"cf4", "col":" col4", "type":"int"},         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},         |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"},         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}       |}      |}""".stripMargin[/code] 

2) Siapkan data dan isi tabel HBase:
kelas kasus HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

objek HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}””””      HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 hingga 255).map { i =>  HBaseRecord(i, “ekstra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Muat DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(catalog)

4) Kueri terintegrasi bahasa:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
 $”col0″ ===“row005” ||
 $”col0″ ===“row020” ||
 $”col0″ === “r20” ||
 $”col0″ <=“row005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .tampilkan

5) kueri SQL:
df.registerTempTable(“tabel”)
sqlContext.sql(“select count(col1) from table”).show

Mengonfigurasi Paket Spark

Pengguna dapat menggunakan konektor Spark-on-HBase sebagai paket Spark standar. Untuk menyertakan paket dalam aplikasi Spark Anda, gunakan:

spark-shell, pyspark, atau spark-submit

> $SPARK_HOME/bin/spark-shell –paket zhzhan:shc:0.0.11-1.6.1-s_2.10

Pengguna juga dapat menyertakan paket sebagai dependensi dalam file SBT Anda. Formatnya adalah spark-package-name:version

spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”

Berjalan di Cluster Aman

Untuk berjalan di cluster yang diaktifkan Kerberos, pengguna harus menyertakan toples terkait HBase ke dalam classpath karena pengambilan dan pembaruan token HBase dilakukan oleh Spark, dan tidak bergantung pada konektor. Dengan kata lain, pengguna perlu menginisiasi environment dengan cara biasa, baik melalui kinit maupun dengan menyediakan principal/keytab. Contoh berikut menunjukkan cara menjalankan dalam cluster aman dengan mode yarn-client dan yarn-cluster. Perhatikan bahwa SPARK_CLASSPATH harus disetel untuk kedua mode, dan toples contoh hanyalah pengganti untuk Spark.

ekspor SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Misalkan hrt_qa adalah akun tanpa kepala, pengguna dapat menggunakan perintah berikut untuk kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –paket zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Menggabungkan Semuanya

Kami baru saja memberikan gambaran singkat tentang bagaimana HBase mendukung Spark di tingkat DataFrame. Dengan aplikasi DataFrame API Spark dapat bekerja dengan data yang disimpan di tabel HBase semudah data yang disimpan di sumber data lainnya. Dengan fitur baru ini, data dalam tabel HBase dapat dengan mudah digunakan oleh aplikasi Spark dan alat interaktif lainnya, mis. pengguna dapat menjalankan kueri SQL kompleks di atas tabel HBase di dalam Spark, melakukan penggabungan tabel terhadap Dataframe, atau berintegrasi dengan Spark Streaming untuk mengimplementasikan sistem yang lebih rumit.

Apa Selanjutnya?

Saat ini, konektor di-host di repo Hortonworks, dan diterbitkan sebagai paket Spark. Itu sedang dalam proses migrasi ke trunk Apache HBase. Selama migrasi, kami mengidentifikasi beberapa bug kritis di trunk HBase, dan mereka akan diperbaiki bersama dengan penggabungan. Kerja komunitas dilacak oleh payung HBase JIRA HBASE-14789, termasuk HBASE-14795 dan HBASE-14796  untuk mengoptimalkan arsitektur komputasi yang mendasari untuk Scan dan BulkGet,  HBASE-14801 untuk menyediakan antarmuka pengguna JSON untuk kemudahan penggunaan, HBASE-15336 untuk jalur penulisan DataFrame, HBASE-15334 untuk dukungan Avro, HBASE-15333  untuk mendukung tipe primitif Java, seperti short, int, long, float, dan double, dll., HBASE-15335 untuk mendukung kunci baris komposit, dan HBASE-15572 untuk menambahkan semantik cap waktu opsional. Kami berharap dapat memproduksi konektor versi masa depan yang membuat konektor lebih mudah digunakan.

Penghargaan

Kami ingin mengucapkan terima kasih kepada Hamel Kothari, Sudarshan Kadambi dan tim Bloomberg dalam membimbing kami dalam pekerjaan ini dan juga membantu kami memvalidasi pekerjaan ini. Kami juga ingin berterima kasih kepada komunitas HBase karena telah memberikan umpan balik mereka dan menjadikannya lebih baik. Terakhir, pekerjaan ini telah memanfaatkan pelajaran dari integrasi Spark HBase sebelumnya dan kami ingin berterima kasih kepada pengembang mereka karena telah membuka jalan.

Referensi:

SHC:https://github.com/hortonworks/shc-release

Paket percikan:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Apa itu Hadoop Cluster? Praktik Terbaik untuk Membangun Cluster Hadoop

  2. Apa itu Pemadatan HBase?

  3. Perbedaan Antara InputSplit vs Blok di Hadoop

  4. Hadoop MapReduce Tutorial untuk Pemula

  5. Transformasi Digital adalah Perjalanan Data Dari Ujung ke Wawasan