Performance: rxExecBy vs gapply on Spark
rxExecBy is a new API of R server release 9.1, it partitions input data source by keys and applies user defined function on each partitions. gapply is a SparkR API that also provides similiar functionality, it groups the SparkDataFrame using specified columns and applies the R function to each group.
Prepare
Environment
The performance comparison of rxExecBy and gapply uses a 4 worker nodes HDI 3.5 cluster with Spark 2.0. VM size of node is standard D4_v2, which is 8 cores and 28G memory. Spark application configuration are,
- Driver memory: 4g
- Executor number: 4
- Executor cores: 5
- Executor memory: 16g
- Executor overhead memory: 4g
The data set I'm using for the benchmark is US airline on-time data from 08 to 12. I clean the data leaving only 30 columns(here is a sample csv file containing 1M rows) and preload it into hive to make life easier for gapply, as gapply can only take SparkDataFrame as input data source.
API
First, we need to understand two APIs and define the performance story and metric we want to make and collect.
rxExecBy
rxExecBy(inData, keys, func, funcParams = NULL, filterFunc = NULL, computeContext = rxGetOption("computeContext"), ...)
The API is very straightforward, so I will only focus on "func". This user defined function will be applied to each partition after "grouping by" operation, and it needs to take "keys" and "data" as two reuqired input arguments, where "keys" is a list of partitioning values, and "data" is a RxXdfData(in RxSpark) data source of the partition. There's no restriction on the return value, but returning a big object is not recommended, because all returned values will be taken back to client, and big object will slow down the performance or even OOM.
gapply
gapply(x, cols, func, schema)
"func" accepts user defined function, and the differences comparing to rxExecBy are,
- In data source is R data.frame
- Return value has to be R data.frame
What to Test
Different from other machine learning algorithms, rxExecBy and gapply only do data partition and apply UDF to partition data. So the performance test is mainly focus on how to partition the data, what's the UDF and input data size.
Key
- DayOfWeek (7 keys)
- Dest (About 300 keys)
- Origin + Dest (About 100K keys)
UDF
The input data of UDF is RxXdfData for rxExecBy, and R data.frame for gapply. We need to define some UDFs that can take both RxXdfData and R data.frame as input, and some other UDFs that only can take R data.frame as input data. In the second case, an additional rxDataStep call is required for UDF of rxExecBy to convert RxXdfData into R data.frame. Below is the details of predefined UDFs,
- rxSummary, takes RxXdfData and R data.frame as input, return data set summary info.
- rxLogit, takes RxXdfData and R data.frame as input, return a logit regression model. This is a multi iterations algorithm comparing to single iteration of rxSummary.
- mean, takes R data.frame as input, return mean values for each columns of input partition data.
Data Set Size
US airline on-time data 08-12 has over 100M rows of data. I cut it on 1M, 2M, 5M, 10M rows and get different sizes.
Define the Metric
We define the performance metric to be number of rows that rxExecBy or gapply can process per second, the higher the better.
Given a M rows data set, assuming the run time is T, then metric value is M/T.
Benchmark
A combination of key and UDF defines a test case. For each test case, I plot a chart to compare the performance and scales on data set size. Missing points in the charts indicate a run failure of either time out(6000s) or OOM.
rxExecBy vs gapply
Key:DayOfWeek & UDF:rxSummary
Key:DayOfWeek & UDF:rxLogit
Key:DayOfWeek & UDF:mean
Key:Dest & UDF:rxSummary
Key:Dest & UDF:rxLogit
Key:Dest & UDF:mean
Key:Origin+Dest & UDF:mean
With key Origin+Dest, we can not take all 100k partitions’ results of rxSummary or rxLogit back, because of return object size is very huge.
The tip here is we can save big object into HDFS in UDF and only return the file path back.
Summary
Generally, rxExecBy is doing better than gapply, especially on big data set. But, I also find some areas that rxExecBy is not doing well and can have further improvements,
- On small data set, sometimes it doesn't fully use cluster cores.
E.g. by using DayOfWeek as key, running on 1M row data set will trigger the coalesce logic to partition all 7 tasks into one execution slot and run in sequence, leaving all other execution slot idle. - It combines all local return values of UDF together and uses R API "save" to store the combined result. However, "save" has very bad performance when R object is very big.
Script
Hive data prepare script,
Performance benchmark script,
Comments
- Anonymous
July 08, 2017
Thanks for this wonderful post. The information has currently assisted me with my job where I was stuck and also really did not recognize exactly what to do aas following step. Looking forward to your following articles!