HDinsight – How to use Spark-HBase connector?

Apache Spark is an open-source parallel processing framework that supports in-memory processing to boost the performance of big-data analytic applications. Azure HDInsight offers a fully managed Spark service with many benefits.

Apache HBase is an open Source No SQL Hadoop database, a distributed, scalable, big data store. It provides real-time read/write access to large datasets. HDInsight HBase is offered as a managed cluster that is integrated into the Azure environment. HBase provides many features as a big data store.

Spark-Hbase Connector

The Spark-Hbase Connector provides an easy way to store and access data from HBase clusters with Spark jobs. HBase is really successful for highest level of data scale needs. Thus, existing Spark customers should definitely explore this storage option. Similarly, if the customers are already having HDinsight HBase clusters and they want to access their data by Spark jobs then there is no need to move data to any other storage medium. In both the cases, the connector will be extremely useful.

Steps to use connector

Currently, we need to manually install the connector on the Spark cluster. We are planning to release it soon with HDInsight clusters. The connector can be installed in 4 simple steps:

  • Step 1: Create a VNET.
  • Step 2: Create Spark and Hbase cluster in same or different subnet of same VNET.
  • Step 3: Copy hbase-site.xml from HBase cluster to your Spark cluster.
  • Step 4: Install the connector.

Following are the detailed steps.

  • Create a Azure Virtual network.The VNET can be easily created from Azure portal.
  • Setup Spark and Hbase clusters. Please find instructions for linux and windows. The connector is tested with Spark 1.6.2 (HDI 3.5) clusters. Please do not use it with Spark 2.0 clusters.
  • On Spark cluster, upgrade maven(if needed) to compile the package.
  • sudo apt-get install maven
  • Copy Package code from Spark-Hbase Connector. Also copy Hbase configuration xml file to Spark configuration folder.
  • sudo cp hbase-site.xml /etc/spark/conf/
  • Compile
  • mvn package -DskipTests
  • Run Spark Submit
  • $SPARK_HOME/bin/spark-submit --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-client --num-executors 2 --driver-memory 512m  --executor-memory 512m  --executor-cores 1  --jars  /usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar  --files /usr/hdp/current/spark-client/conf/hbase-site.xml /home/hdiuser/shc-master/target/hbase-spark-connector-1.0.0.jar

Sample Program

A sample notebook is checked-in at :https://github.com/AnunayTiwari/Notebook-Spark-Jupyter-SampleProgram-HBaseSparkConnector/blob/master/SparkHBaseConnectorSampleProgram.ipynb
Note Please set the “livy.file.local-dir-whitelist” spark config parameter to appropriate value to access the local paths of Spark cluster. In this case, it is set to “/”. This is because of a known issue in Hdinsight Spark cluster. Once the issue will be fixed, we will not need to set this parameter explicitly.
You can also launch spark shell with the connector in the following way to run commands.

  • Spark Shell
  • $SPARK_HOME/bin/spark-shell --packages zhzhan:shc:0.0.11-1.6.1-s_2.10 --driver-class-path /usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase-client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar:/etc/hbase/conf --executor-memory 512m --executor-cores 1 --files /usr/hdp/current/hbase-client/conf/hbase-site.xml --jars /usr/hdp/current/phoenix-client/phoenix-server.jar

    Following are the important commands.

  • Define Class and Object
  • In [5]:case class HBaseRecordAirline(col0: String,Year: Int,Quarter: Int,Month: Int,DayofMonth: Int,DayOfWeek: Int,FlightDate: Int,UniqueCarrier: String,AirlineID: String)
    defined class HBaseRecordAirline
    In [6]:
    object HBaseRecordAirlineTest {def apply(i: Int): HBaseRecordAirline = {val s = s"""row${"%03d".format(i)}""" 
    defined module HBaseRecordAirlineTest
  • Define the catalog: Catalog keeps mapping between Spark data and HBase table.
  • In [7]:
    val cat = s"""{
         |             |"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
         |             |"rowkey":"key",
         |             |"columns":{
         |               |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |               |"Year":{"cf":"Year", "col":"Year", "type":"int"},
         |               |"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
         |               |"Month":{"cf":"Month", "col":"Month", "type":"int"},
         |               |"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
         |               |"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
         |               |"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
         |               |"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
         |               |"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
         |             |}
         |           |}""".stripMargin
    cat: String = 
    "table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
    "col0":{"cf":"rowkey", "col":"key", "type":"string"},
    "Year":{"cf":"Year", "col":"Year", "type":"int"},
    "Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
    "Month":{"cf":"Month", "col":"Month", "type":"int"},
    "DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
    "DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
    "FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
    "UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
    "AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
  • Write Data: Given a data frame with specified schema, this will create a HBase table with 5 regions and save the data frame inside. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.
  • In [11]:
    sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
  • Define DataFrame
  • In [13]:
    val df = withCatalog(cat)
    df: org.apache.spark.sql.DataFrame = [UniqueCarrier: string, Month: int, col0: string, Quarter: int, FlightDate: int, AirlineID: string, DayOfWeek: int, DayofMonth: int, Year: int]
  • SQL Support
  • In [12]:
    def withCatalog(cat: String): DataFrame = {
         |       sqlContext
         |         .read
         |         .options(Map(HBaseTableCatalog.tableCatalog->cat))
         |         .format("org.apache.spark.sql.execution.datasources.hbase")
         |         .load()
         |     }
    withCatalog: (cat: String)org.apache.spark.sql.DataFrame
    In [13]:
    val df = withCatalog(cat)
    df: org.apache.spark.sql.DataFrame = [UniqueCarrier: string, Month: int, col0: string, Quarter: int, FlightDate: int, AirlineID: string, DayOfWeek: int, DayofMonth: int, Year: int]
    In [14]:
    In [15]:
    val c = sqlContext.sql("select AirlineID from table1")
    c: org.apache.spark.sql.DataFrame = [AirlineID: string]
    In [16]:
    |   row000|
    |   row001|
    |   row002|
    |   row003|
    |   row004|
    |   row005|
    |   row006|
    |   row007|
    |   row008|

Please refer Spark-Hbase Connector for more information on the connector.
I am working on using the connector for the EventHub Scenario. This will allow to persist events from Event Hub to HBase. I will be posting about this useful scenario in my next blog.

Comments (0)

Skip to main content