rxExec: Run A Function on Multiple Nodes or Cores
Description
Allows distributed execution of a function in parallel across nodes (computers) or cores of a "compute context" such as a cluster.
Usage
rxExec(FUN, ... , elemArgs, elemType = "nodes", oncePerElem = FALSE, timesToRun = -1L,
packagesToLoad = NULL, execObjects = NULL, taskChunkSize = NULL, quote = FALSE,
consoleOutput = NULL, autoCleanup = NULL, continueOnFailure = TRUE,
RNGseed = NULL, RNGkind = NULL, foreachOpts = NULL)
Arguments
FUN
the function to be executed; the nodes or cores on which it is run are determined by the currently-active compute context and by the other arguments of rxExec.
...
arguments passed to the function FUN
each time it is executed. Separate argument values can be sent for each computation by wrapping a vector or list of argument values in rxElemArg.
elemArgs
a vector or list specifying arguments to FUN
. This allows a different set of arguments to be passed to FUN
each time it is executed. The length of the vector or list must match the number of times the function will be executed. Each of these elements will be passed in turn to FUN
. Using a list of lists allows multiple named or unnamed parameters to be passed. If elemArgs
has length 1, that argument is passed to all compute elements (and thus is an alternative to ...). The elements of elemArgs
may be named; if they are node names those elements will be passed to those nodes. Alternatively, they can be "rxElem1", "rxElem2" and so on. In this case, the list of returned values will have those corresponding names. See the Details section for more information. This is an alternative to using rxElemArg one or more times.
elemType
[Deprecated]. The distributed computing mode to be used. This parameter is currently deprecated and is not honored by any of the supported compute contexts. It might come back to use in the future.
oncePerElem
logical flag. If TRUE
and elemType="nodes"
, FUN
will be run exactly once on each specified node. In this case, each element of the return list will be named with the name of the node that computed that element. If FALSE
, a node may be used more than once (but never simultaneously). oncePerElem
must be set to FALSE
if elemType="cores"
. This parameter is ignored if the active compute context is local.
timesToRun
integer specifying the total number of instances of the function FUN
to run. If timesToRun=-1
, the default, then times is set to the length of the elemArgs
argument, if it exists, else to the number of nodes or cores specified in the compute context object, if that is exact. In the latter case, if the elemType="nodes"
and a single set of arguments is being passed to each node, each element of the return list will be named with the name of the node that computed that element. If timesToRun
is not -1, it must be consistent with this other information.
packagesToLoad
optional character vector specifying additional packages to be loaded on the nodes for this job. If provided, these packages are loaded after any packagesToLoad
specified in the current distributed compute context.
execObjects
optional character vector specifying additional objects to be exported to the nodes for this job, or an environment containing these objects. The specified objects are added to FUN
's environment, unless that environment is locked, in which case they are added to the environment in which FUN
is evaluated. For purposes of efficiency, this argument should not be used for exporting large data objects. Passing large data through reference to a shared storage location (e.g., HDFS) is recommended.
taskChunkSize
optional integer scalar specifying the number of tasks to be executed per compute element, or worker. By submitting tasks in chunks, you can avoid some of the overhead of starting new R processes over and over. For example, if you are running thousands of identical simulations on a cluster, it makes sense to specify the taskChunkSize
so that each worker can do its allotment of tasks in a single R process. This argument is incompatible with the oncePerElem
argument; if both are supplied, this one is ignored. It is also incompatible with lists supplied to elemArgs
with compute element names.
quote
logical flag. If TRUE
, underlying calls to do.call
have the corresponding flag set to TRUE
. This is primarily of use to the doRSR package, but may be of use to other users.
consoleOutput
NULL
or logical value. If TRUE
, the console output from the all of the processes is printed to the user console. Note that the output from different nodes or cores may be interleaved in an unpredictable way. If FALSE
, no console output is displayed. Output can be retrieved with the function rxGetJobOutput for a non-waiting job. If not NULL
, this flag overrides the value set in the compute context when the job was submitted. If NULL
, the setting in the compute context will be used. This parameter is ignored if the active compute context is local.
autoCleanup
NULL
or logical value. If TRUE
, artifacts created by the distributed computing job are deleted when the results are returned or retrieved using rxGetJobResults. If FALSE
, the artifacts are not deleted, and the results may be obtained repeatedly using rxGetJobResults, and the console output via rxGetJobOutput until rxCleanupJobs is used to delete the artifacts. If not NULL
, this flag overrides the value set in the compute context when the job was submitted. If you routinely set autoCleanup=FALSE
, you may eventually fill your hard disk with compute artifacts. If you set autoCleanup=TRUE
and experience performance degradation on a Windows XP client, consider setting autoCleanup=FALSE
. This parameter is ignored if the active compute context is local.
continueOnFailure
NULL
or logical value. If TRUE
, the default, then if an individual instance of a job fails due to a hardware or network failure, an attempt will be made to rerun that job. (R syntax errors, however, will cause immediate failure as usual.) Furthermore, should a process instance of a job fail due to a user code failure, the rest of the processes will continue, and the failed process will produce a warning when the output is collected. Additionally, the position in the returned list where the failure occured will contain the error as opposed to a result. This parameter is ignored if the active compute context is local or RxForeachDoPar
.
RNGseed
NULL
, the string "auto"
, or an integer to be used as the seed for parallel random number generation. See the Details section for a description of how the "auto"
string is used.
RNGkind
NULL
or a character string specifying the type of random number generator to be used. Allowable strings are the strings accepted by rxRngNewStream, "auto"
, and, if the active compute context is local parallel, "L'Ecuyer-CMRG"
(for compatibility with the parallel package). See the Details section for a description of how the "auto"
string is used.
foreachOpts
NULL
or a list containing options to be passed to the foreach parallel computing backend. See foreach for details.
Details
rxExec
has very limited functionality for RxInSqlServer
for CTP3; computations
are performed sequentially.
There are two primary sets of use cases: In the first set, each computing element
(node or core) gets the same argument values; in this case, do not use elemArgs
or
rxElemArg
. In the second, each element gets a different set of
arguments; use rxElemArg for each argument that has different values, or
an elemArgs
whose length is equal to the number of times FUN
will
be executed.
Set 1 (All computing elements get the same arguments):
1
rxExec(FUN, arg1, arg2)
Set 2: Every computing element gets a different set of arguments. If rxElemArg is used, the length of the vector or list for the enclosed argument must equal the number of compute elements. For example,
rxExec(FUN, arg1 = 1, arg2 = rxElemArg(c(1:5)))
If elemArgs
is a nested list, the individual lists are passed to the compute resources according to the following:
1
The argument lists can be named according to which compute resource each component list should be assigned.
For example, rxExec(FUN, elemArgs=list(compute1=list(arg1,arg2), compute2=list(arg3, arg4)))
. In this
case, the list of arguments must be the same length as the list of nodes requested for the current
compute context, and have the same names. If oncePerElem=TRUE
and elemType="nodes"
, then
the computation will be performed once on each requested node, and each node is assured of getting the
argument with its name. If oncePerElem=FALSE
, there is no guarantee that each node will be used in
the processing, so arguments intended for a particular node may not be used; they must still be provided,
however.
The component names must be valid R syntactic names. If you have nodes on your cluster with names that are not valid R syntactic names, use the function rxMakeRNodeNames on the node name to determine the appropriate name to give the list component. When the return value is a list with elements named by compute node, the node names are as returned by the rxMakeRNodeNames function.
1
The arguments lists, if not named, will be passed to the compute resources allowed by the
compute context according to their position in the list. This is useful when
you don't care which nodes or cores the function is executed on but want
different arguments to be executed on each resource. For example,
rxExec(FUN, elemArgs=list(list(arg1,arg2), list(arg3, arg4)))
or
rxExec(FUN, elemArgs=list(c(arg1, arg2), list(arg3, arg4)))
The arguments RNGseed
and RNGkind
can be used to control random number generation in
the workers. By default, both are NULL
and no special random number control is used. If either
or both RNGseed
and RNGkind
are set to "auto"
, a parallel random number stream is
initialized on each worker, using the "MT2203"
generator and separate substreams for
each worker. If other non-null valid values are supplied for these arguments, they are used as is for
the "MT2203"
generator, which supports multiple substreams, but for other
rxRngNewStream
-supported generators, the seed will be used as the starting point of a sequence
of seeds, one for each worker. In the special case of a local parallel compute context, the
"L'Ecuyer-CMRG"
generator case can be specified, in which case the parallel package's
clusterSetRNGStream
function is called on the internally generated parallel cluster.
Value
If a waiting compute context is active, a list with an element for each job, where each element contains the value(s) returned by that job's function call(s). If a non-waiting compute context is active, a jobInfo object. See rxGetJobResults.
Author(s)
Microsoft Corporation Microsoft Technical Support
See Also
rxElemArg, rxGetJobResults, rxGetJobStatus, RxComputeContext, rxSetComputeContext, RxSpark, RxHadoopMR, RxForeachDoPar, RxLocalParallel, RxLocalSeq, rxGetNodeInfo, rxMakeRNodeNames rxRngNewStream
Examples
## Not run:
## Run function with no parameters
rxExec(getwd)
## Pass the same set of arguments to each compute element
rxExec(list.files, all.files=TRUE, full.names=TRUE)
## Run function with the same vector sent as the first
## argument to each compute element
## The values 1 to 10 will be printed 10 times
x <- 1:10
rxExec(print, x, elemType = "cores", timesToRun = 10)
## Pass a different argument value to each compute element
## The values 1 to 10 will be printed once each
rxExec(print, rxElemArg( x ), elemType = "cores")
## Extract different columns from a data frame on different nodes
set.seed(100)
myData <- data.frame(x = 1:100, y = rep(c("a", "b", "c", "d"), 25),
z = rnorm(100), w = runif(100))
myVarsToKeep = list(
c("x", "y"),
c("x", "z"),
c("x", "w"),
c("y", "z"),
c("z", "w"))
# myVarDataFrames will be a list of data frames
myVarDataFrames <- rxExec(rxDataStep, inData = myData, varsToKeep = rxElemArg(myVarsToKeep))
## Extract different rows from the data frame on different nodes
myRowSelection = list(
expression(y == 'a'),
expression(y == 'b'),
expression(y == 'c'),
expression(y == 'd'),
expression(z > 0))
myRowDataFrames <- rxExec(rxDataStep, inData = myData, rowSelection = rxElemArg(myRowSelection))
## Use the taskChunkSize argument
rxExec(sqrt, rxElemArg(1:100), taskChunkSize=50)
## End(Not run)