Share via


rxExecBy: Partition Data by Key Values and Execute User Function on Each Partition

Description

Partition input data source by keys and apply user defined function on individual partitions. If input data source is already partitioned, apply user defined function on partitions directly. Currently supported in local, localpar, RxInSqlServer and RxSpark compute contexts.

Usage

  rxExecBy(inData, keys, func, funcParams = NULL, filterFunc = NULL, computeContext = rxGetOption("computeContext"), ...)


Arguments

inData

a data source object supported in currently active compute context, e.g., RxSqlServerData for RxInSqlServer and RxHiveData for RxSpark. In local and localpar compute contexts, a character string specifying a .xdf file or a data frame object can be also used.

keys

character vector of variable names to specify the values in those variables are used for partitioning.

func

the user function to be executed. The user function takes keys and data as two required input arguments where keys determines the partitioning values and data is a data source object of the corresponding partition. data can be a RxXdfData object or a RxODBCData object, which can be transformed to a standard R data frame by using rxDataStep method. The nodes or cores on which it is running are determined by the currently active compute context.

funcParams

list of additional arguments for the user function func.

filterFunc

the user function that takes a data frame of keys values as an input argument, applies filter to the keys values and returns a data frame containing rows whose keys values satisfy the filter conditions. The input data frame has similar format to the results returned by rxPartition which comprises of partitioning variables and an additional variable of partition data source. This filterFunc allows user to control what data partitions to be applied by the user function func. filterFunc currently is not supported in RxHadoopMR and RxSpark compute contexts.

computeContext

a RxComputeContext object.

...

additional arguments to be passed directly to the Compute Engine.

Value

A list which is the same length as the number of partitions in the inData argument. Each element in the top level list contains a three element list described below.

keys

a list which contains key values for the partition.

result

the object returned from the invocation of the user function with keys values. When an error occurs during the invocation of the user function the value will be NULL.

status

a string which takes the value of either "OK" or "Error". In RxSpark compute context, it may include additional warning and error messages.

Note

The user function can call any function defined in R packages which are loaded by default and pass parameters which are defined in base R, the default loaded packages, or user defined S3 classes. The user function won't work with global variables or functions in non-default loaded packages unless they are redefined or reloaded within the scope of the user function.

Author(s)

Microsoft Corporation Microsoft Technical Support

See Also

rxExecByPartition, rxImport, rxDataStep, RxTextData, RxXdfData, RxHiveData, RxSqlServerData

Examples


     ## Not run:

 ##############################################################################
 # run analytics with local compute context
 ##############################################################################

   # create an input xdf data source.
   inFile <- "claims.xdf"
   inFile <- file.path(dataPath = rxGetOption(opt = "sampleDataDir"), inFile)
   inXdfDataSource <- RxXdfData(file = inFile)

   # define an user function that builds a linear model
   ".linMod" <- function(keys, data)
   {
     result <- rxLinMod(formula = cost ~ number, data = data)
     return(result$coefficients[[1]])
   }

   # set local compute context with 4-way parallel
   rxSetComputeContext("localpar")
   rxOptions(numCoresToUse = 4)

   # define a filter function
   ".carFilter" <- function(partDF)
   {
       subset(partDF, car.age != "10+" & type == "A")
   }

   # call rxExecBy with no filterFunction
   results1 <- rxExecBy(inData = inXdfDataSource, keys = c("car.age", "type"), func = .linMod)

   # call rxExecBy with filterFunction
   results2 <- rxExecBy(inData = inXdfDataSource, keys = c("car.age", "type"), func = .linMod, filterFunc = .carFilter)

 ##############################################################################
 # run analytics with SQL Server compute context
 ##############################################################################

   # Note: for improved security, read connection string from a file, such as
   # sqlServerConnString <- readLines("sqlServerConnString.txt")

   sqlServerConnString <- "SERVER=hostname;DATABASE=RevoTestDB;UID=DBUser;PWD=Password;"
   inTable <- paste0("airlinedemosmall")
   sqlServerDataDS <- RxSqlServerData(table = inTable, connectionString = sqlServerConnString)

   # user function
   ".Count" <- function(keys, data, params)
   {
       myDF <- rxImport(inData = data)
       return (nrow(myDF))
   }

   # Set SQL Server compute context with level of parallelism = 2
   sqlServerCC <- RxInSqlServer(connectionString = sqlServerConnString, numTasks = 4)
   rxSetComputeContext(sqlServerCC)

   # Execute rxExecBy in SQL Server compute context
   sqlServerCCResults <- rxExecBy(inData = sqlServerDataDS, keys = c("DayOfWeek"), func = .Count)

 ##############################################################################
 # run analytics with RxSpark compute context
 ##############################################################################
   # start Spark app
   sparkCC <- rxSparkConnect()

   # define function to compute average delay
   ".AverageDelay" <- function(keys, data) {
     df <- rxDataStep(data)
     mean(df$ArrDelay, na.rm = TRUE)
   }

   # define colInfo
   colInfo <-
     list(
       ArrDelay = list(type = "numeric"),
       CRSDepTime = list(type = "numeric"),
       DayOfWeek = list(type = "string")
     )

   # create text data source with airline data
   textData <-
     RxTextData(
       file = "/share/SampleData/AirlineDemoSmall.csv",
       firstRowIsColNames = TRUE,
       colInfo = colInfo,
       fileSystem = RxHdfsFileSystem()
     )

   # group textData by day of week and get average delay on each day
   objs <- rxExecBy(textData, keys = c("DayOfWeek"), func = .AverageDelay)

   # transform objs to a data frame
   do.call(rbind, lapply(objs, unlist))

   # stop Spark app
   rxSparkDisconnect(sparkCC)
  ## End(Not run)