RxSpark: Create Spark compute context, connect and disconnect a Spark application
Description
RxSpark
creates a Spark compute context. rxSparkConnect
creates the compute context object with RxSpark
and then
immediately starts the remote Spark application.
rxSparkDisconnect
shuts down the remote Spark application with
rxStopEngine
and switches to a local compute context. All
rx*
function calls after this will run in a local compute
context.
Usage
RxSpark(
object,
hdfsShareDir = paste( "/user/RevoShare", Sys.info()[["user"]], sep="/" ),
shareDir = paste( "/var/RevoShare", Sys.info()[["user"]], sep="/" ),
clientShareDir = rxGetDefaultTmpDirByOS(),
sshUsername = Sys.info()[["user"]],
sshHostname = NULL,
sshSwitches = "",
sshProfileScript = NULL,
sshClientDir = "",
nameNode = rxGetOption("hdfsHost"),
master = "yarn",
jobTrackerURL = NULL,
port = rxGetOption("hdfsPort"),
onClusterNode = NULL,
wait = TRUE,
numExecutors = rxGetOption("spark.numExecutors"),
executorCores = rxGetOption("spark.executorCores"),
executorMem = rxGetOption("spark.executorMem"),
driverMem = "4g",
executorOverheadMem = rxGetOption("spark.executorOverheadMem"),
extraSparkConfig = "",
persistentRun = FALSE,
sparkReduceMethod = "auto",
idleTimeout = 3600,
suppressWarning = TRUE,
consoleOutput = FALSE,
showOutputWhileWaiting = TRUE,
autoCleanup = TRUE,
workingDir = NULL,
dataPath = NULL,
outDataPath = NULL,
fileSystem = NULL,
packagesToLoad = NULL,
resultsTimeout = 15,
tmpFSWorkDir = NULL,
... )
rxSparkConnect(
hdfsShareDir = paste( "/user/RevoShare", Sys.info()[["user"]], sep="/" ),
shareDir = paste( "/var/RevoShare", Sys.info()[["user"]], sep="/" ),
clientShareDir = rxGetDefaultTmpDirByOS(),
sshUsername = Sys.info()[["user"]],
sshHostname = NULL,
sshSwitches = "",
sshProfileScript = NULL,
sshClientDir = "",
nameNode = rxGetOption("hdfsHost"),
master = "yarn",
jobTrackerURL = NULL,
port = rxGetOption("hdfsPort"),
onClusterNode = NULL,
numExecutors = rxGetOption("spark.numExecutors"),
executorCores = rxGetOption("spark.executorCores"),
executorMem = rxGetOption("spark.executorMem"),
driverMem = "4g",
executorOverheadMem = rxGetOption("spark.executorOverheadMem"),
extraSparkConfig = "",
sparkReduceMethod = "auto",
idleTimeout = 10000,
suppressWarning = TRUE,
consoleOutput = FALSE,
showOutputWhileWaiting = TRUE,
autoCleanup = TRUE,
workingDir = NULL,
dataPath = NULL,
outDataPath = NULL,
fileSystem = NULL,
packagesToLoad = NULL,
resultsTimeout = 15,
reset = FALSE,
interop = NULL,
tmpFSWorkDir = NULL,
... )
rxSparkDisconnect(computeContext = rxGetOption("computeContext"))
Arguments
object
object of class RxSpark. This argument is optional. If supplied, the values of the other specified arguments are used to replace those of object
and the modified object is returned.
hdfsShareDir
character string specifying the file sharing location within HDFS. You must have permissions to read and write to this location. When you are running in Spark local mode, this parameter will be ignored and it will be forced to be equal to the parameter shareDir.
shareDir
character string specifying the directory on the master (perhaps edge) node that is shared among all the nodes of the cluster and any client host. You must have permissions to read and write in this directory.
clientShareDir
character string specifying the absolute path of the temporary directory on the client. Defaults to /tmp for POSIX-compliant non-Windows clients. For Windows and non-compliant POSIX clients, defaults to the value of the TEMP environment variable if defined, else to the TMP environment variable if defined, else to NULL
. If the default directory does not exist, defaults to NULL. UNC paths ("\\host\dir
") are not supported.
sshUsername
character string specifying the username for making an ssh connection to the Spark cluster. This is not needed if you are running your R client directly on the cluster. Defaults to the username of the user running the R client (that is, the value of Sys.info()[["user"]]
).
sshHostname
character string specifying the hostname or IP address of the Spark cluster node or edge node that the client will log into for launching Spark jobs and for copying files between the client machine and the Spark cluster. Defaults to the hostname of the machine running the R client (that is, the value of Sys.info()[["nodename"]]
) This field is only used if onClusterNode
is NULL
or FALSE
. If you are using PuTTY on a Windows system, this can be the name of a saved PuTTY session that can include the user name and authentication file to use.
sshSwitches
character string specifying any switches needed for making an ssh connection to the Spark cluster. This is not needed if one is running one's R client directly on the cluster.
sshProfileScript
Optional character string specifying the absolute path to a profile script on the sshHostname host. This is used when the target ssh host does not automatically read in a .bash_profile, .profile or other shell environment configuration file for the definition of requisite variables.
sshClientDir
character string specifying the Windows directory where Cygwin's ssh.exe and scp.exe or PuTTY's plink.exe and pscp.exe executables can be found. Needed only for Windows. Not needed if these executables are on the Windows Path or if Cygwin's location can be found in the Windows Registry. Defaults to the empty string.
nameNode
character string specifying the Spark name node hostname or IP address. Typically you can leave this at its default value. If set to a value other than "default" or the empty string (see below), this must be an address that can be resolved by the data nodes and used by them to contact the name node. Depending on your cluster, it may need to be set to a private network address such as "master.local"
. If set to the empty string, "", then the master process will set this to the name of the node on which it is running, as returned by Sys.info()[["nodename"]]
. This is likely to work when the sshHostname points to the name node or the sshHostname is not specified and the R client is running on the name node. If you are running in Spark local mode, this paramter defaults to "file:///"; otherwise it defaults to rxGetOption("hdfsHost").
master
Character string specifying the master URL passed to Spark. The value of this parameter could be "yarn", "standalone" and "local". The formats of Spark's master URL (except for mesos) specified in this website https://spark.apache.org/docs/latest/submitting-applications.html
is also supported.
jobTrackerURL
character scalar specifying the full URL for the jobtracker web interface. This is used only for the purpose of loading the job tracker web page from the rxLaunchClusterJobManager
convenience function. It is never used for job control, and its specification in the compute context is completely optional. See the rxLaunchClusterJobManager page for more information.
port
numeric scalar specifying the port used by the name node for HDFS. Needs to be able to be cast to an integer. Defaults to rxGetOption("hdfsPort").
onClusterNode
logical scalar or NULL specifying whether the user is initiating the job from a client that will connect to either an edge node or an actual cluster node, directly from either an edge node or node within the cluster. If set to FALSE
or NULL
, then sshHostname
must be a valid host.
wait
logical scalar. If TRUE
or if persistentRun
is TRUE
, the job will be blocking and the invoking function will not return until the job has completed or has failed. Otherwise, the job will be non-blocking and the invoking function will return, allowing you to continue running other R code. The object rxgLastPendingJob
is created with the job information. You can pass this object to the rxGetJobStatus function to check on the processing status of the job. rxWaitForJob will change a non-waiting job to a waiting job. Conversely, pressing ESC changes a waiting job to a non-waiting job, provided that the scheduler has accepted the job. If you press ESC before the job has been accepted, the job is canceled.
numExecutors
numeric scalar specifying the number of executors in Spark, which is equivalent to parameter --num-executors in spark-submit app. If not specified, the default behavior is to launch as many executors as possible, which may use up all resources and prevent other users from sharing the cluster.
executorCores
numeric scalar specifying the number of cores per executor, which is equivalent to parameter --executor-cores in spark-submit app.
executorMem
character string specifying memory per executor (e.g., 1000M, 2G), which is equivalent to parameter --executor-memory in spark-submit app.
driverMem
character string specifying memory for driver (e.g., 1000M, 2G), which is equivalent to parameter --driver-memory in spark-submit app.
executorOverheadMem
character string specifying memory overhead per executor (e.g. 1000M, 2G), which is equivalent to setting spark.yarn.executor.memoryOverhead in YARN. Increasing this value will allocate more memory for the R process and the ScaleR engine process in the YARN executors, so it may help resolve job failure or executor lost issues.
extraSparkConfig
character string specifying extra arbitrary Spark properties (e.g., "--conf spark.speculation=true --conf spark.yarn.queue=aQueue"
), which is equivalent to additional parameters passed into spark-submit app.
persistentRun
EXPERIMENTAL. logical scalar. If TRUE
, the Spark application (and associated processes) will persist across jobs until the idleTimeout is reached or the rxStopEngine function is called explicitly. This avoids the overhead of launching a new Spark application for each job. If FALSE
, a new Spark application will be launched when a job starts and will be terminated when the job completes.
sparkReduceMethod
Spark job collects all parallel tasks' results to reduce as final result. This parameter decides reduce strategy: oneStage/twoStage/auto. oneStage: reduce parallel tasks to one result with one reduce function; twoStage: reduce paralllel tasks to square root size with the first reduce function, then reduce to final result with the second reduce function; auto: spark will smartly select oneStage or twoStage.
idleTimeout
numeric scalar specifying the number of seconds of being idle (i.e., not running any Spark job) before system kills the Spark process. Setting a value greater than 600 is recommended. Setting a negative value will not timeout. sparklyr interoperation will have no timeout.
suppressWarning
logical scalar. If TRUE
, suppress warning message regarding missing Spark application parameters.
consoleOutput
logical scalar. If TRUE
, causes the standard output of the R processes to be printed to the user console.
showOutputWhileWaiting
logical scalar. If TRUE
, causes the standard output of the remote primary R and Spark job process to be printed to the user console while waiting for (blocking on) a job.
autoCleanup
logical scalar. If TRUE
, the default behavior is to clean up the temporary computational artifacts and delete the result objects upon retrieval. If FALSE
, then the computational results are not deleted, and the results may be acquired using rxGetJobResults, and the output via rxGetJobOutput until the rxCleanupJobs is used to delete the results and other artifacts. Leaving this flag set to FALSE
can result in accumulation of compute artifacts which you may eventually need to delete before they fill up your hard drive.
workingDir
character string specifying a working directory for the processes on the master node.
dataPath
NOT YET IMPLEMENTED. character vector defining the search path(s) for the data source(s).
outDataPath
NOT YET IMPLEMENTED. NULL
or character vector defining the search path(s) for new output data file(s). If not NULL
, this overrides any specification for outDataPath
in rxOptions
fileSystem
NULL
or an RxHdfsFileSystem to use as the default file system for data sources when created when this compute context is active.
packagesToLoad
optional character vector specifying additional packages to be loaded on the nodes when jobs are run in this compute context.
resultsTimeout
A numeric value indicating for how long attempts should be made to retrieve results from the cluster. Under normal conditions, results are available immediately. However, under certain high load conditions, the processes on the nodes have reported as completed, but the results have not been fully committed to disk by the operating system. Increase this parameter if results retrieval is failing on high load clusters.
tmpFSWorkDir
character string specifying the temporary working directory in worker nodes. It defaults to /dev/shm to utilize in-memory temporary file system for performance gain, when the size of /dev/shm is less than 1G, the default value would switch to /tmp for guarantee sufficiency You can specify a different location if the size of /dev/shm is insufficient. Please make sure that YARN run-as user has permission to read and write to this location
reset
if TRUE
all cached Spark Data Frames will be freed and all existing Spark applications that belong to the current user will be shut down.
interop
NULL
or a character string or vector of package names for RevoScaleR interoperation. Currently, the "sparklyr" package is supported.
computeContext
Spark compute context to be terminated by rxSparkDisconnect
.
...
additional arguments to be passed directly to the Microsoft R Services Compute Engine.
Details
This compute context is supported for Cloudera, Hortonworks, and MapR Hadoop distributions.
Value
object of class RxSpark.
Author(s)
Microsoft Corporation Microsoft Technical Support
See Also
rxGetJobStatus,
rxGetJobOutput,
rxGetJobResults,
rxCleanupJobs,
RxHadoopMR,
RxInSqlServer,
RxComputeContext,
rxSetComputeContext,
RxSpark-class.
Examples
## Not run:
##############################################################################
# Running client on edge node
##############################################################################
hadoopCC <- RxSpark()
##############################################################################
# Running from a Windows client
# (requires Cygwin and/or PuTTY)
##############################################################################
mySshUsername <- "user1"
mySshHostname <- "12.345.678.90" #public facing cluster IP address
mySshSwitches <- "-i /home/yourName/user1.pem" #use .ppk file with PuTTY
myShareDir <- paste("/var/RevoShare", mySshUsername, sep ="/")
myHdfsShareDir <- paste("/user/RevoShare", mySshUsername, sep="/")
mySparkCluster <- RxSpark(
hdfsShareDir = myHdfsShareDir,
shareDir = myShareDir,
sshUsername = mySshUsername,
sshHostname = mySshHostname,
sshSwitches = mySshSwitches)
#############################################################################
## rxSparkConnect example
myHadoopCluster <- rxSparkConnect()
##rxSparkDisconnect example
rxSparkDisconnect(myHadoopCluster)
#############################################################################
## sparklyr interoperation example
library("sparklyr")
cc <- rxSparkConnect(interop = "sparklyr")
sc <- rxGetSparklyrConnection(cc)
mtcars_tbl <- copy_to(sc, mtcars)
hive_in_data <- RxHiveData(table = "mtcars")
rxSummary(~., data = hive_in_data)
## End(Not run)