Ask Learn
Preview
Ask Learn is an AI assistant that can answer questions, clarify concepts, and define terms using trusted Microsoft documentation.
Please sign in to use Ask Learn.
Sign inThis browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
rxExecBy is a new API of R server release 9.1, it partitions input data source by keys and applies user defined function on each partitions. gapply is a SparkR API that also provides similiar functionality, it groups the SparkDataFrame using specified columns and applies the R function to each group.
The performance comparison of rxExecBy and gapply uses a 4 worker nodes HDI 3.5 cluster with Spark 2.0. VM size of node is standard D4_v2, which is 8 cores and 28G memory. Spark application configuration are,
The data set I'm using for the benchmark is US airline on-time data from 08 to 12. I clean the data leaving only 30 columns(here is a sample csv file containing 1M rows) and preload it into hive to make life easier for gapply, as gapply can only take SparkDataFrame as input data source.
First, we need to understand two APIs and define the performance story and metric we want to make and collect.
rxExecBy(inData, keys, func, funcParams = NULL, filterFunc = NULL, computeContext = rxGetOption("computeContext"), ...)
The API is very straightforward, so I will only focus on "func". This user defined function will be applied to each partition after "grouping by" operation, and it needs to take "keys" and "data" as two reuqired input arguments, where "keys" is a list of partitioning values, and "data" is a RxXdfData(in RxSpark) data source of the partition. There's no restriction on the return value, but returning a big object is not recommended, because all returned values will be taken back to client, and big object will slow down the performance or even OOM.
gapply(x, cols, func, schema)
"func" accepts user defined function, and the differences comparing to rxExecBy are,
Different from other machine learning algorithms, rxExecBy and gapply only do data partition and apply UDF to partition data. So the performance test is mainly focus on how to partition the data, what's the UDF and input data size.
The input data of UDF is RxXdfData for rxExecBy, and R data.frame for gapply. We need to define some UDFs that can take both RxXdfData and R data.frame as input, and some other UDFs that only can take R data.frame as input data. In the second case, an additional rxDataStep call is required for UDF of rxExecBy to convert RxXdfData into R data.frame. Below is the details of predefined UDFs,
US airline on-time data 08-12 has over 100M rows of data. I cut it on 1M, 2M, 5M, 10M rows and get different sizes.
We define the performance metric to be number of rows that rxExecBy or gapply can process per second, the higher the better.
Given a M rows data set, assuming the run time is T, then metric value is M/T.
A combination of key and UDF defines a test case. For each test case, I plot a chart to compare the performance and scales on data set size. Missing points in the charts indicate a run failure of either time out(6000s) or OOM.
With key Origin+Dest, we can not take all 100k partitions’ results of rxSummary or rxLogit back, because of return object size is very huge.
The tip here is we can save big object into HDFS in UDF and only return the file path back.
Generally, rxExecBy is doing better than gapply, especially on big data set. But, I also find some areas that rxExecBy is not doing well and can have further improvements,
Hive data prepare script,
Performance benchmark script,
Ask Learn is an AI assistant that can answer questions, clarify concepts, and define terms using trusted Microsoft documentation.
Please sign in to use Ask Learn.
Sign in