Ask Learn
Preview
Please sign in to use this experience.
Sign inThis browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Apache Spark is an open-source parallel processing framework that supports in-memory processing to boost the performance of big-data analytic applications. Azure HDInsight offers a fully managed Spark service with many benefits.
Apache HBase is an open Source No SQL Hadoop database, a distributed, scalable, big data store. It provides real-time read/write access to large datasets. HDInsight HBase is offered as a managed cluster that is integrated into the Azure environment. HBase provides many features as a big data store.
Spark-Hbase Connector
The Spark-HBase Connector provides an easy way to store and access data from HBase clusters with Spark jobs. HBase is really successful for highest level of data scale needs. Thus, existing Spark customers should definitely explore this storage option. Similarly, if the customers are already having HDinsight HBase clusters and they want to access their data by Spark jobs then there is no need to move data to any other storage medium. In both the cases, the connector will be extremely useful.
Steps to use connector
If you want to use the latest connector, you need to git checkout the source code and build from here, otherwise you can use the binary jar directly from Hortonworks repo.
Following are the detailed steps.
sudo cp hbase-site.xml /etc/spark/conf/
$SPARK_HOME/bin/spark-submit --verbose --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/ --files /etc/spark2/conf/hbase-site.xml /To/your/application/jar
For example, you can run the examples of the package by following command.
$SPARK_HOME/bin/spark-submit --verbose --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-cluster --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/ --files /etc/spark2/conf/hbase-site.xml /home/sshuser/shc/examples/target/shc-examples-1.1.1-2.1-s_2.11-SNAPSHOT.jar
Sample Program
$SPARK_HOME/bin/spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
Following are the important commands.
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
val sparkConf = new SparkConf().setAppName("HBaseTestApp")
import spark.sqlContext.implicits._
case class HBaseRecordAirline(col0: String,Year: Int,Quarter: Int,Month: Int,DayofMonth: Int,DayOfWeek: Int,FlightDate: Int,UniqueCarrier: String,AirlineID: String)
defined class HBaseRecordAirline
object HBaseRecordAirlineTest {def apply(i: Int): HBaseRecordAirline = {val s = s"""row${"%03d".format(i)}"""
HBaseRecordAirline(s,i,i,i,i,i,i,s,s)}}
defined module HBaseRecordAirlineTest
val cat = s"""{
| |"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
| |"rowkey":"key",
| |"columns":{
| |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
| |"Year":{"cf":"Year", "col":"Year", "type":"int"},
| |"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
| |"Month":{"cf":"Month", "col":"Month", "type":"int"},
| |"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
| |"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
| |"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
| |"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
| |"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
| |}
| |}""".stripMargin
cat: String =
{
"table":{"namespace":"default", "name":"airdelaydata_scv_Test1"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"Year":{"cf":"Year", "col":"Year", "type":"int"},
"Quarter":{"cf":"Quarter", "col":"Quarter", "type":"int"},
"Month":{"cf":"Month", "col":"Month", "type":"int"},
"DayofMonth":{"cf":"DayofMonth", "col":"DayofMonth", "type":"int"},
"DayOfWeek":{"cf":"DayOfWeek", "col":"DayOfWeek", "type":"int"},
"FlightDate":{"cf":"FlightDate", "col":"FlightDate", "type":"int"},
"UniqueCarrier":{"cf":"UniqueCarrier", "col":"UniqueCarrier", "type":"string"},
"AirlineID":{"cf":"AirlineID", "col":"AirlineID", "type":"string"}
}
}
val data = (0 to 8).map { i =>HBaseRecordAirlineTest(i)}
sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
def withCatalog(cat: String): DataFrame = {spark.sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog->cat)).format("org.apache.spark.sql.execution.datasources.hbase").load()}
val df = withCatalog(cat)
df.registerTempTable("table1")
val c = spark.sqlContext.sql("select AirlineID from table1")
c.show()
+---------+
|AirlineID|
+---------+
| row000|
| row001|
| row002|
| row003|
| row004|
| row005|
| row006|
| row007|
| row008|
+---------+
There is also a Jupyter notebook example showing how to use spark hbase connector. Check out here!
Please sign in to use this experience.
Sign in