Sdílet prostřednictvím


rxExecBy - Productivity and scale with partitioned data

There is often a need to train data for “many small models” instead of a “single big model”. Specifically, users may want to train separate models such as logistic regressions or boosted trees within groups (partitions) like “states”, “countries”, “device id”, etc. or they may want to compute summary statistics such as mean, min, max, median and do data management within groups. In these scenarios, analytics workflow generally is done in two phases: (1) data processing phase which is to take an input data set such as .csv files, SQL Server database tables and split it into multiple data partitions based on specified columns (e.g., “states”, “countries”, “device id”) and (2) computation phase which is to run analytics computation on individual data partitions.

The new release of Microsoft R Server 9.1 has introduced a set of necessary rx functions to allow users to quickly develop a solution to address the above scenarios. Please refer to this blog for an introduction to rxExecBy. In this post, we are going to consider side-by-side solutions for parallelly training a model on individual partitions, one is to use the RevoScaleR functions available in Microsoft R Server and the other is to use functions offered in open source packages including foreach, doParallel and BigMemory available in CRAN. The discussion focuses on the following topics of the two solutions running on local compute contexts:

  • Code complexity and usability
  • Scalability and memory boundedness
  • Performance
  • Flexibility

Code complexity and usability

To develop an end-to-end solution, we want to estimate how much time and effort is required to research available resources and prototype the solution to make it functional. For the RevoScaleR solution, it requires just a single RevoScaleR package that provides the new function rxExecBy for doing both partitioning and parallelly-by-partition execution. rxExecBy works in similar way as rxExec and other existing rx functions, hence if one is familiar with rx functions, it would be straightforward to quickly have rxExecBy up and running. For the non-RevoScaleR solution, specifically for the example in this section, two open source packages ‘foreach’ and ‘doParallel’ are needed for parallel execution and the R standard function ‘split’ is needed for partitioning.

Our examples below show the R scripts for a scenario that takes an input airline data set from a .csv file, partitions the dataset on ‘DayOfWeek’ and finally performs linear analysis on individual partitions in parallel. As can be seen, RevoScaleR solution looks simple using rxExecBy that handles both partitioning and model-fitting in one operation. With non-RevoScaleR alternatives, the solution involves two steps –  ‘split’ and ‘foreach’. In addition, note that the Airline dataset chosen in this example has 600K rows of data, which is small enough to fit in the memory, so that the ‘split’ function can be functional. In the next section, we will discuss scalability for large datasets and have an open source solution that is not memory bounded, in which the open source solution also shows more lengthy and complicated codes.

With RevoScaleR:

With foreach and doParallel:

Scalability and memory boundedness

RevoScaleR package is developed to enable running analytics with very large datasets that would exceed the memory and processing capabilities of any one machine. The new Rx functions to support partitioning feature in the RevoScaleR package 9.1 hold the same principle, i.e. they work on both small and large datasets and are not memory bounded. For a comparable open source solution, we use BigMemory package that provides functions to read a large data set into in-memory big.matrix from a backing file and functions to perform analysis with big.matrix objects. One limitation of using BigMemory package is that data items stored in big.matrix must have the same data types, this means if the input dataset has variables with mixed data types such as numeric, integer, character, factor, then this solution wouldn’t work.

For experiments with large datasets, we use the original airline data set posted at https://stat-computing.org/dataexpo/2009/the-data.html and preprocess it to merge into a single .csv file which contains 120M rows of data with 13 variables. Note that, all the variables in this dataset have integer values therefore the dataset can work with big.matrix functions.

As shown in the R scripts below, the RevoScaleR solution looks similar to what we have in the previous section, with a minor change for the input data file. For the non-RevoScaleR solution, a set of packages including foreach, doParallel and BigMemory needs to load and big.matrix functions are used instead.

With RevoScaleR

With foreach, doParallel and BigMemory:

Performance

We will evaluate performance of the two solutions discussed the previous section. Specifically, execution times of the two steps: data partitioning and parallel user function execution are measured. In the RevoScaleR solution, rxExecBy essentially performs both partitioning and parallel user function execution, thus to make it comparable with the open source solution, we modify the code to separate rxExecBy into two operations. For partition step, we use rxPartition, a new rx function introduced in the RevoScaleR 9.1. The idea of introducing rxPartition is to allow users to partition once and saving partitions into partitioned Xdf files. Then analytics can be run with rxExecBy repeatedly on the same set of partitions with different user functions, at different times, and/or machine without needing to re-partition the input data set again and again.

With RevoScaleR

With foreach, doParallel and BigMemory:

Results:

With RevoScaleR:
> print(paste0("Partition Time: ", parttm["elapsed"]))
[1] "Partition Time: 1455.91"
> print(paste0("Compute Time: ", exectm["elapsed"]))
[1] "Compute Time: 4.59999999999991"

With foreach, doParallel and BigMemory:
> print(paste0("Partition Time: ", parttm["elapsed"]))
[1] "Partition Time: 578.18"
> print(paste0("Compute Time: ", exectm["elapsed"]))
[1] "Compute Time: 81.2199999999999"

The results shown above are from one run on a Windows 10 machine with Intel Xeon 3.5GHz CPU of 8 logical processors, 32GB of RAM and regular hard drive for the airline dataset of 120M rows with 13 variables. For average values from 10 runs, the numbers have similar trends, partition times of RevoScaleR and non-RevoScaleR solutions are 1377.025 vs 563.492 seconds respectively and parallel-user-function-execution times of RevoScaleR and non-RevoScaleR are 3.486 vs 90.477 seconds respectively.

Partition time for RevoScaleR solution is about 2.5 times longer than non-RevoScaleR solution’s, this is because the partitioned Xdf is done in incremental fashion, where it loops through reading each chunk of input data, partitioning and writing partitions in individual .xdf files; and this process is currently single threaded; while big.matrix spends most time for preparing phase to read the input .csv file and write to a backing file and for partitioning phase, it just maintains indexes of partitions in memory and does not write to files. In future releases, the partitioning process for RevoScaleR will be enhanced with multi-threading to improve the performance further.

Execution time for RevoScaleR solution is about 25 times better than non-RevoScaleR solutions’. In addition, for RevoScaleR solution, the partition phase can just be done once, and the model fitting phase can be executed repeatedly with different user functions.

Flexibility

rxExecBy and rxPartition provide other capabilities to operate on existing partitioned data set which cannot be achieved as easily with alternative open-source solutions.

Reload an existing partitioned data set

As mentioned earlier, rxPartition allows partitioning an input data set by key values and save the results to a partitioned Xdf on disk. The partitioned Xdf can be then copied to another machine and loaded into an Xdf data source object to be used with rxExecBy. The sample code can be found in the previous section at lines 13-15 where the partitioned Xdf is stored on disk in ‘./airlinePartitionXdf’.

Append to existing partitioned data set

rxPartition also allows appending new data to an existing partitioned Xdf. This is to address a scenario where a large dataset, e.g., airline dataset for the years 1987-2008, has been partitioned earlier, and then a new dataset, e.g. airline dataset of the year 2009, is added. In this scenario, rxPartition can just run on the new dataset to append data to the existing partitioned Xdf. The R script with rxPartition shown in the previous section was for partitioning the large dataset of 120M rows, and the result was stored in the partitioned Xdf ‘airlinePartitionXdf’. The following example shows how to append to the existing partitioned Xdf ‘airlinePartitionXdf’:

Filter partitions

There is a scenario where user doesn’t want to train a model on a subset of partitions, this can be done by rxExecBy with the filterFunc argument. For example, the following R code fits linear models on weekdays partitions and excludes weekend partitions:

WODA – write once deploy anywhere

The R code with rxExecBy and rxPartition can run on different platforms with different data sources and compute context that require little changes. For example, to run the same analysis with SQL Server data source in SQL server compute context, the code just needs to be modified to specify the desired data source and compute context and the rxExecBy call remains the same.

In Summary

In this post, we have looked through solutions to fit a model on data partitions in parallel with sample codes. The discussion focused on four main aspects of the solutions: code complexity and usability, scalability and memory boundedness, performance and flexibility. Among those aspects, the solution of using RevoScaleR package has shown that it can be a good one-stop shop model to be able to quickly adopt and deploy for production. It has simple code, scales effortlessly with reasonable performance and provides many capabilities to flexibly handle data partitions. In contrast, the solution of using open source packages requires to put together functionalities provided in different packages and can show good performance for certain types of data running for specific scenarios.

Post co-authored by Doug Service and Sumit Kumar

For a comprehensive view of all the capabilities in Microsoft R Server 9.1, refer to this blog