Microsoft R Server - Using Hive data source in Spark compute context

Before Microsoft R Server 9.0 release, if you needed to perform analytics on your Hive or Parquet data you had to first manually export to some supported format (e.g., csv) and then use something like RxTextData to perform analytics after potentially uploading the text data to HDFS. With Microsoft R Server 9.0 release, Spark compute context now supports Hive and Parquet data sources so you can directly work with them. We will work through an example showing how to use Hive datasource in this blog (we will cover Parquet in a future blog).

Pre-requisites

For the purpose of working with these examples we need a supported Hadoop cluster (e.g., HDInsight, Hortonworks HDP, Cloudera or MapR) with Spark and Hive available on the cluster. You can create an HDInsight cluster on Azure if you want to try out the examples.

If not already done, install Microsoft R Server 9.0 on your Hadoop cluster.

Loading Data into Hive

We will use the Airline Demo sample data supplied with Microsoft R Server and upload them to Hive. If you have data already in Hive you can try to work with that data.

Create a SampleData directory in HDFS if it doesn't exist already (you might need appropriate hdfs permissions to create this directory and add permissions to users that you might run the examples as):

hadoop fs -mkdir -p /share/SampleData

Copy the sample data provided with RevoScaleR package to HDFS:

hadoop fs -copyFromLocal /usr/lib64/microsoft-r/3.3/lib64/R/library/RevoScaleR/SampleData/* /share/SampleData/hadoop fs -ls /share/SampleData

Start a spark shell through which we can upload (using Scala) the data to a Hive table (may need to specify a queue for which you have permissions to submit Spark/YARN jobs which you can do by using the --queue <queue_name> parameter):

spark-shell --master yarn

Using Hive Datasource

Here is a simple use of Hive datasource RxHiveData to get summary information.

OUTPUT:

> rxSummary(~., hive_data) Call: rxSummary(formula = ~., data = hive_data)Summary Statistics Results for: ~. Data: hive_data (RxSparkData Data Source) Number of valid observations: 6e+05Name       Mean     StdDev    Min        Max        ValidObs MissingObs arrdelay   11.31794 40.688536 -86.000000 1490.00000 582628   17372 crsdeptime 13.48227  4.697566   0.016667   23.98333 600000       0Category Counts for dayofweek Number of categories: 7 Number of valid observations: 6e+05 Number of missing observations: 0dayofweek Counts Monday    97975 Tuesday   77725 Wednesday 78875 Thursday  81304 Friday    82987 Saturday  86159 Sunday    94975

Hive DataSource with trasformations

You can work with Hive data source in combination with other data sources. This can allow you to do various kinds of data transformations and work with a combination of data sources.
Following code shows using Hive data source with transformations and output writing to an Xdf data source.

OUTPUT:

> rxGetVarInfo(xdfOutput) Var 1: arrdelay, Type: integer, Low/High: (-86, 1490) Var 2: crsdeptime, Type: numeric, Storage: float32, Low/High: (0.0167, 23.9833) Var 3: dayofweek 7 factor levels: Monday Tuesday Wednesday Thursday Friday Saturday Sunday Var 4: arrdelay15, Type: logical, Low/High: (0, 1)

Following code shows using Hive data source with transformations and output to a data frame

OUTPUT:

> head(myData) arrdelay dayofweek 1      285    Monday 2      284   Tuesday 3      281   Tuesday 4      278 Wednesday 5      288 Wednesday 6      294 Wednesday

Transformations using Hive Query

The Hive query language is a powerful language with many of the capabilities of SQL. So why wait to bring data out from Hive to R for doing the transformations? You can do many of the transformations within Hive query and get the transformed output which you can directly work with for modelling or prediction etc. Following is an example that tries to predict on the Airline data set we imported above into Hive, whether a flight will be delayed by greater than 15 mins based on day of week and departure time. Notice that we computed the additional logical variable to indicate if a flight was delayed by more than 15 mins (arrdelay15) in the Hive query itself instead of doing a transform later.

OUTPUT:

> logitObj Logistic Regression Results for: arrdelay15 ~ dayofweek + crsdeptime Data: hive_data (RxSparkData Data Source) Dependent variable(s): arrdelay15 Total independent variables: 9 (Including number dropped: 1) Number of valid observations: 582628 Number of missing observations: 17372 Coefficients: arrdelay15 (Intercept)           -2.01814346 dayofweek=Monday       0.06295299 dayofweek=Tuesday     -0.09538265 dayofweek=Wednesday   -0.12945236 dayofweek=Thursday    -0.19226847 dayofweek=Friday       0.26043331 dayofweek=Saturday     0.01939645 dayofweek=Sunday       Dropped crsdeptime             0.06846911

As you can see from above examples, support for Hive data source opens up many possibilities when working with data sources in Microsoft R Server with Spark. We will cover some more interesting scenarios in a future blog.

References

Get started using R Server on HDInsight /en-us/azure/hdinsight/hdinsight-hadoop-r-server-get-started
What's New in R Server 9.0.1 https://msdn.microsoft.com/en-us/microsoft-r/rserver-whats-new