HDinsight - How to use Spark-HBase connector?

Apache Spark is an open-source parallel processing framework that supports in-memory processing to boost the performance of big-data analytic applications. Azure HDInsight offers a fully managed Spark service with many benefits.

Apache HBase is an open Source No SQL Hadoop database, a distributed, scalable, big data store. It provides real-time read/write access to large datasets. HDInsight HBase is offered as a managed cluster that is integrated into the Azure environment. HBase provides many features as a big data store.

Spark-Hbase Connector

The Spark-HBase Connector provides an easy way to store and access data from HBase clusters with Spark jobs. HBase is really successful for highest level of data scale needs. Thus, existing Spark customers should definitely explore this storage option. Similarly, if the customers are already having HDinsight HBase clusters and they want to access their data by Spark jobs then there is no need to move data to any other storage medium. In both the cases, the connector will be extremely useful.

Steps to use connector

If you want to use the latest connector, you need to git checkout the source code and build from here, otherwise you can use the binary jar directly from Hortonworks repo.

  • Step 1: Create a VNET.
  • Step 2: Create Spark and HBase cluster in same or different subnet of same VNET.
  • Step 3: Copy hbase-site.xml from HBase cluster to your Spark cluster.
  • Step 4:  Run spark submit with "--packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/"

Following are the detailed steps.

    • Create a Azure Virtual network. The VNET can be easily created from Azure portal.
    • Setup Spark and HBase clusters. Please find instructions for linux and windows. The connector is tested with Spark 2.1 (HDI 3.6) clusters.
    • Copy HBase configuration xml file to Spark configuration folder.
 sudo cp hbase-site.xml /etc/spark/conf/
    • Run Spark Submit
 $SPARK_HOME/bin/spark-submit --verbose --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/ --files /etc/spark2/conf/hbase-site.xml /To/your/application/jar

For example, you can run the examples of the package by following command.

 $SPARK_HOME/bin/spark-submit --verbose --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-cluster --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/ --files /etc/spark2/conf/hbase-site.xml /home/sshuser/shc/examples/target/shc-examples-1.1.1-2.1-s_2.11-SNAPSHOT.jar

Sample Program

    • Spark Shell
 $SPARK_HOME/bin/spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/

Following are the important commands.

    • Include packages
 
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
val sparkConf = new SparkConf().setAppName("HBaseTestApp")
import spark.sqlContext.implicits._
    • Define Class and Object
 case class HBaseRecordAirline(col0: String,Year: Int,Quarter: Int,Month: Int,DayofMonth: Int,DayOfWeek: Int,FlightDate: Int,UniqueCarrier: String,AirlineID: String)
defined class HBaseRecordAirline
object HBaseRecordAirlineTest {def apply(i: Int): HBaseRecordAirline = {val s = s"""row${"%03d".format(i)}""" 
                                                                        HBaseRecordAirline(s,i,i,i,i,i,i,s,s)}}
defined module HBaseRecordAirlineTest
    • Define the catalog: Catalog keeps mapping between Spark data and HBase table.
 
val cat = s"""{
     |             |"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
     |             |"rowkey":"key",
     |             |"columns":{
     |               |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
     |               |"Year":{"cf":"Year", "col":"Year", "type":"int"},
     |               |"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
     |               |"Month":{"cf":"Month", "col":"Month", "type":"int"},
     |               |"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
     |               |"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
     |               |"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
     |               |"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
     |               |"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
     |             |}
     |           |}""".stripMargin
cat: String = 
{
"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"Year":{"cf":"Year", "col":"Year", "type":"int"},
"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
"Month":{"cf":"Month", "col":"Month", "type":"int"},
"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
}
}
    • Write Data: Given a data frame with specified schema, this will create a HBase table with 5 regions and save the data frame inside. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.
 
val data = (0 to 8).map { i =>HBaseRecordAirlineTest(i)}
sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    • SQL Support
 
def withCatalog(cat: String): DataFrame = {spark.sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog->cat)).format("org.apache.spark.sql.execution.datasources.hbase").load()}

val df = withCatalog(cat)

df.registerTempTable("table1")

val c = spark.sqlContext.sql("select AirlineID from table1")

c.show()
+---------+
|AirlineID|
+---------+
|   row000|
|   row001|
|   row002|
|   row003|
|   row004|
|   row005|
|   row006|
|   row007|
|   row008|
+---------+

There is also a Jupyter notebook example showing how to use spark hbase connector. Check out here!