Best practices for executing embarrassingly parallel workloads with R Server on Spark

Introduction

An embarrassingly parallel workload or problem is one where little or no effort is needed to separate the problem into a number of parallel tasks. This is often the case where there is little or no dependency or need for communication between those parallel tasks, or for results between them.

 

In this blog, we want to highlight the following best practices for achieving these types of workloads with R Server on Spark:

 

1. Setting Intel Math Kernel Libraries threads to 1
2. Optimizations to the Spark compute context

 

Setting the scene

We devise an embarrassingly parallel problem that requires running a time series model for many different datasets (often described as the "many models" problem). As this blog is concerned with how to parallelize the problem in Spark, we keep the analytics simple by fitting an ARIMA(1,0,0) time series model to 100,000 individual time series - in reality we would do something more elaborate using the forecast package.

The data is conveniently stored in 100,000 files where each file contains the monthly sales of an individual product SKU sold at Contoso supermarket:

 

/time_series_data_folder
sku1.csv
sku2.csv
.
.
.
sku100000.csv

 

Utilizing foreach code

For those that are familiar with the open source R foreach package, we could parallelize this problem using all the available cores on a single machine using foreach %dopar% with a parallel backend:

For those that have already built a library of code using foreach we can easily scale this across n number of cores across a Spark cluster using Microsoft R Server by making the following minor changes to the code:

 

1. Define a Spark compute context using RxSpark
2. Register the Spark backend for foreach using registerDoRSR

 

This is how we achieve this in Microsoft R Server on Spark:

For further details on arguments to RxSpark, see https://msdn.microsoft.com/en-us/microsoft-r/scaler/rxspark

To make the pleasingly parallel job performant in Spark, we also recommend setting:

 

1. The task chunk size so that the tasks are divided equally across the cores
2. The Intel Math Kernel Library threads to 1 (setMKLThreads()) in the foreach loop

 

As we are parallelizing the problem, we do not want to introduce multi-threading of linear algebra/matrix math functions as this will lead to contention. NOTE: You will also need to ensure that any other implied parallelism within your function is accounted for e.g. parlapply.

As the data would sit in HDFS rather than the Linux filesystem, we utilize Microsoft R Server’s rxDataStep to read the file from HDFS. If you need to copy data from a Linux filesystem to HDFS we provide a utility function for you in R Server (note, we can copy a whole folder):

The foreach code now looks like:

rxExec paradigm

For those that have used rxExec in Microsoft R Server to run embarrassingly parallel jobs, the code would look like:

The function tsPrediction is effectively the code block in the foreach call and we parallelize this function using rxExec.

 

A note on packages

In our time series example above we could have used auto.arima in Rob Hyndman’s excellent forecast package (https://mran.microsoft.com/package/forecast/). To use packages in foreach and rxExec we use the same syntax that we would normally use for these functions, i.e.

However, we must ensure the package is installed on each data/worker node in the cluster (for HDInsight customers, see /en-us/azure/hdinsight/hdinsight-hadoop-r-server-get-started).

 

Conclusion

In summary, below are the best practices that should be followed for optimum running of embarrassingly parallel workloads.

 

1. Always read objects from HDFS when you are on Spark. Do not pass large objects around the cluster
2. Set Intel Math Kernel Library threads to 1 to prevent contention
3. Optimize the Spark compute context

       a. executorMem and executorOverheadMem – setting this lower than the default RxSpark CC (4gb), gives much more

           parallelism because you don’t need so much resource for each executor

       b. numExecutors – this should be set to the number of nodes in the cluster

       c. executorCores – this should be set to the number of usable cores in the entire cluster *divided* by the number of nodes.

           By ‘usable cores’ we mean numberOfClusterCores-numberOfNodes i.e. we give 1 core per node for other Hadoop

           processes

 

Blog Authors

This blog is a joint effort among the following three authors:

  • Simon Field, Solution Architect, Data Insight
  • Sam Kemp, Technology Solutions Professional
  • Premal Shah, Senior Program Manager