Performance Optimization When Using rxExec() to Parallelize Algorithms
Microsoft R Server is designed around the concept of PEMAs (Parallel External Memory Algorithms). An external memory algorithm is an algorithm which does not require all data to be present in memory, it can process data sequentially, generating intermediate results and then iterating on those results. These algorithms are widely used for statistical analysis and machine learning, and allow large datasets to be computed on relatively small amounts of memory. PEMAs are implementations of these algorithms which allow chunks of data to be processed in parallel on many cores of the system. Such algorithms require the separation of the requisitely sequential code segments from the parallelizable segments.
The rxExec() function in Microsoft R Server is not, in and of itself a PEMA, but is a tool provided to the user for the writing of parallel algorithms. Out of the box parallel MRS Algorithms, such as rxBtrees() or rxLinMod() , to name a few, are intrinsically designed to run on various distributed systems. rxExec() allows users to parallelize algorithms for operation on many cores of a single system; and allows algorithms which were not designed to operate on distributed data process data on distributed systems.
Another important thing to note is Microsoft R Server operates as a closed source package within the open source R interpreter. This is achieved by using an open source, Microsoft maintained, library: RxLink. RxLink mitigates passing data between the open source R process(es) and the closed source Microsoft R Server process(es) via named pipes. (for details, please see: Importing and Exporting Large In-Memory Dataframes). This mode of operation leads us to make performance versus ease-of-use considerations when writing custom PEMAs using rxExec() ; requiring us to pay attention to how we manage marshalling data from open source R to Microsoft R Server.
rxExec() takes as its [required] arguments:
- FUN, this function will be executed on nodes of a distributed system or cores of a local machine. The configuration of this execution depends on the active compute context. Setting compute context will be briefly demonstrated in this article.
- “…” or elemArgs, if arguments are specified as a comma separated list, they will be passed to the parallelized function and used on every execution of the function. If ElemArgs is specified, it must be a list whose length matches the number of parallelizations of an algorithm, the list will be iterated as every parallel operation is started, and pass each unique input to a separate instance of the function. Both depend on the arguments required by said function, this can be empty. elemArgs usage will be demonstrated in this article.
- elemType, this defines the distributed computing mode to use, and the allowable types depend on the compute context. The default is "nodes" for a distributed compute context, which tells rxExec() to execute as one parallel execution per distributed node, and "cores" for a local compute context, which tells rxExec() to execute as one parallel execution per logical core.
A common use of rxExec() is to define a custom function for execution and passing the function, as well as its input parameters, to rxExec() , which will be provided to each parallel execution of the function.
For the purpose of generality, and providing examples which everyone can try, this article will focus on the local parallel compute context, “localpar”, that is, using rxExec() to parallelized algorithms across many cores of your local machine.
Let’s examine a simple example. We will use some open source packages to define a random dataset, and use rxExec() to take sums of subsets of the data in parallel, using system.time() to measure execution speed. In the first part of the example, we will use rxExec() the "naïve" way. Afterwards we will make some optimizations to our script, and utilize rxExec() in a more "correct" way. We will compare the performance and explain optimizations that can help the user maximize their performance.
Example:
When we execute this code, we see the following execution times (in seconds):
User:
14.06
System:
26.56
Elapsed:
74.00
and the result is:
[1] TRUE
This was the naïve way to use rxExec() : Write a function and then call that function, passing the input parameters, within rxExec() . Unfortunately, we saw less than stellar performance. Considering the volume of data, this is not terrible, but, we can do much better with a few simple optimizations.
The fundamental error with our non-optimized code is that we ignore how Microsoft R Server works inside of open source R. Because Microsoft R Server is a separate process from R itself, they do not have a shared memory space, all messages between the main R process and the Microsoft R Server process must be sent via named pipes.
To send a message across a named pipe, the RxLink library will serialize data contained within the R process's memory and send it to Microsoft R Server, after Microsoft R Server has complete operating on data in its memory space, it will serialize the results and send them back to RxLink for deserialization.
if we examine our call to rxExec() , we can see the bottleneck which is degrading our performance:
To optimize this parallel algorithm, we want to remove the overhead caused by serialization/deserialization. To do this, we will change how susbsetSums() accesses the dataset.
Example, the setup segment of the code is identical:
Executing this script, we get the execution time of as (in seconds):
saveRDS()
+
rxExec()
=
Total
User:
0.70
0.02
0.72
System:
0.14
0.05
0.19
Elapsed:
0.86
9.72
10.58
Which is performance improvement of:
User:
98.47%
(19.53x Faster)
System:
99.28%
(139.79x Faster)
Elapsed:
88.10%
(6.99x Faster)
Viewed graphically, this is:
In conclusion, the performance bottleneck of transmitting large chunks across the named pipe is staggering, and completely defeats the speed increase that you get from rxExec() 's ability to parallelize algorithms. When writing R scripts for Microsoft R Server, a user cannot ignore the overhead of data marshalling between open source R and Microsoft R Server.
Comments
- Anonymous
November 16, 2016
Great article - really insightful thanks!Might need to add "library(iterators)" to your sample code? Was getting 'Error in eval(expr, envir, enclos) : could not find function "iter"' until I imported it.- Anonymous
November 16, 2016
Hi KB,Thank you for the suggestion! I have edited the script. I believe the version of MRS I was using for these snippets has iterators loaded at launch via the RProfile.site file.Thank you again,- Kirill
- Anonymous