Asynchronous web service consumption via batch processing with mrsdeploy

Important

This content is being retired and may not be updated in the future. The support for Machine Learning Server will end on July 1, 2022. For more information, see What's happening to Machine Learning Server?

Applies to: Machine Learning Server, Microsoft R Server 9.1

In this article, you can learn how to consume a web service asynchronously, which is especially useful with large input data sets and long-running computations. The typical approach to consuming web services, "Request Response" consumption, involves a single API call to execute the code in that web service once. The "Asynchronous Batch" approach involves the execution of code without manual intervention using multiple asynchronous API calls on a specific web service sent as a single request to Machine Learning Server. Then, Machine Learning Server immediately executes those operations once for every row of data provided.

Asynchronous batch workflow

Generally speaking, the process for asynchronous batch consumption of a web service involves the following:

  1. Call the web service on which the batch execution should be run
  2. Define the data records for the batch execution task
  3. Start (or cancel) the batch execution task
  4. Monitor task and interact with results

Use these following public API functions to define, start, and interact with your batch executions.

Asynchronous batch execution in R with Machine Learning Server

End-to-end workflow example

Use this sample code to follow along with the workflow described in greater detail in the following sections.

Important

Be sure to replace the remoteLogin() function with the correct login details for your configuration. Connecting to Machine Learning Server using the mrsdeploy package is covered in this article.

##          EXAMPLE: DEPLOY MODEL & BATCH CONSUME SERVICE               ##

##########################################################################
#              Create & Test a Logistic Regression Model                 #
##########################################################################

# Use logistic regression equation of vehicle transmission in the data set 
# 'mtcars' to estimate the probability of a vehicle being fitted with a 
# manual transmission based on horsepower (hp) and weight (wt)

# Create glm model with `mtcars` dataset
carsModel <- glm(formula = am ~ hp + wt, data = mtcars, family = binomial)

# Produce a prediction function that can use the model
manualTransmission <- function(hp, wt) {
  # --- Build a plot to demonstrate files in results ---
  png(file = "Histogram.png", bg = "transparent")
  hist(mtcars$hp, breaks = 10, col = "red", xlab = "Horsepower",
       main="Histogram of Horsepower")
  dev.off()
  
  # --- Perdict and return answer ---
  newdata <- data.frame(hp = hp, wt = wt)
  predict(carsModel, newdata, type = "response")
}

# test function locally by printing results
print(manualTransmission(120, 2.8)) # 0.6418125

##########################################################################
#                 Log into Server                            #
##########################################################################

# Use `remoteLogin` to authenticate local admin account. 
# Use session = false so that no remote R session started
remoteLogin("http://localhost:12800", 
            username = "admin", 
            password = "{{YOUR_PASSWORD}}",
            session = FALSE)

##########################################################################
#                   Publish Model as a Service                          #
##########################################################################

# Generate a unique serviceName for demos and assign to variable serviceName
serviceName <- paste0("mtService", round(as.numeric(Sys.time()), 0))

# Publish as service using publishService() function from `mrsdeploy` 
# package. Use the name variable and provide unique version number.
# Assign service to the variable `api`
api <- publishService(
  serviceName,
  code = manualTransmission,
  model = carsModel,
  inputs = list(hp = "numeric", wt = "numeric"),
  outputs = list(answer = "numeric"),
  artifacts = c("Histogram.png"),
  v = "v1.0.0"
)

# Consume service by calling function, `manualTransmission` contained 
# in this service
result <- api$manualTransmission(120, 2.8)

# Print response output named `answer`
print(result$output("answer")) # 0.6418125   

##########################################################################

##########################################################################
#            Perform Batch Consumption & Get Swagger in R                #
##########################################################################

# Get the service using getService() function from `mrsdeploy`
# Assign service to the variable `txService`.
txService <- getService(serviceName, "v1.0.0")

# Define the record data for the batch execution task.  Record data comes 
# from a data.frame called mtcars. Note: mtcars is a data.frame of 
# 11 cols with names (mpg, cyl, ..., carb) and 32 rows of numerics. 
# Assign to the batch object called 'txBatch'.
records <- head(mtcars[, c(4, 6)], 2) # hp & wt
txBatch <- txService$batch(records) 

# Set thread count using parallelCount. Default is 10.
# txBatch <- txService$batch(records, parallelCount = 5) 

# Start the batch task
txBatch <- txBatch$start() 

# Get the task execution id to reference during or after its execution:
id <- txBatch$id() 

# If you need to cancel the batch execution, try this:
# txBatch$cancel()

# Monitor batch execution results with results().
# Check results every 3 seconds until task finishes or fails. 
# Assign returned results to batch result object we called 'batchres'.
batchRes <- NULL
while(TRUE) {
  batchRes <- txBatch$results(showPartialResult = TRUE)  #Default is true

  if (batchRes$state == txBatch$STATE$failed) { stop("Batch execution failed") } 
  if (batchRes$state == txBatch$STATE$complete) { break }
  
  message("Polling for asynchronous batch to complete...")
  Sys.sleep(3)
}

# Once the batch task is complete, get the execution records by index from 
# the batch results object, 'batchRes'. This object is the service output.

# For every record, return these results (totalItemCount = # of data records)
for(i in seq(batchRes$totalItemCount)) {
  
  #1. List of every artifact that was generated by this execution index.
  files <- txBatch$listArtifacts(i)
  
  #2. Get the contents of each artifact returned in the previous list.
  for (fileName in files) {
    content <- txBatch$artifact(i, fileName)
    if (is.null(content)) { stop("Unable to get file") }
  }
  
  #3. Download artifacts from execution index to the current working directory  
  # unless a dest = "<path>" is specified.
  
    # Download of a single named artifact
    txBatch$download(i, "Histogram.png")
  
    # Download of all artifacts
    txBatch$download(i, dest = getwd())
  
  #4. Get results for a given index row 
  answer <- batchRes$execution(1)$outputParameters$answer
  answer
}

Public functions for batch

You can use the following supported public functions to consume a service asynchronously.

Batch functions performed on the service object

Once you get the service object, use these public functions on that object.

Function Usage Description
batch view Define the data records to be batched and the thread count
getBatchExecutions view Get the list of batch execution identifiers
getBatch view Get batch object using its unique execution identifier

Batch functions performed on the batch object

Once you have the batch object, use these public functions to interact with it.

Function Description Usage
start view Starts the execution of a batch scoring operation
cancel view Cancel the named batch execution
id view Get the execution identifier for the named batch process
STATE view Poll for the state of the batch execution (failed, complete, ...)
results view Poll for batch execution results, partial or full results as defined
execution view Get results for a given index row returned as an array
listArtifacts view List of every artifact files that was generated by this execution index
artifact view Print the contents of the named artifact file generated by the batch execution
download view Download one or all artifact files from execution index

1. Get the web service

Once you have authenticated, retrieve the web service from the server, assign it to a variable, and define the inputs to it as record data in a data frame, CSV, or TSV.

Batching begins by retrieving the web service containing the code against which you score the data records you define next. You can get a service using its name and version with the getService() function from mrsdeploy. The result is a service object, which in our example is called txService.
The getService function is covered in detail in the article "How to interact with and consume web services in R."

Syntax: getService("<serviceName>", "<version>")

Example:

# Get the service using getService() function from `mrsdeploy`
# Assign service to the variable `txService`.
txService <- getService("mtService", "v1.0.0")

2. Define the data records to be batched

Next, use the public api function batch to define the input record data for the batch and set the number of concurrent threads for processing.

Syntax: batch(inputs, parallelCount = 10)

Argument Description
inputs Specify the R data.frame name directly, or specify a flat list filename and convert it to a data.frame using the base R function, read.csv.
parallelCount Default value is 10. Specify the number of concurrent threads that can be dedicated to processing records in the batch. Take care not to set a number so high that it negatively impacts performance.

Returns: The batch object

Example:

# INPUTS = data.frame
# Use mtcars data.frame as input. Assign to batch object 'txBatch'.
# Reduce thread count to 5.
records <- head(mtcars[, c(4, 6)], 2) # hp & wt
txBatch <- txService$batch(records, parallelCount = 5)

# INPUT = Flat CSV converted to a data.frame using read.csv 
# Assign data.frame to 'records' variable. Then, use 'records' as input.
records <- read.csv("mtcars.csv")
txBatch  <- myService$batch(records, parallelCount = 15)
      
# INPUT = TSV file converted to a data.frame using read.csv 
# Declare the separator
records <- read.csv("mtcars.tsv", sep = "\t")
txBatch  <- myService$batch(records)

3. Start, find, or cancel the batch execution

Next, use the public api functions to start the asynchronous batch execution on the batch object, monitor the execution, or even cancel it.

Start batch execution task

Start the batch task with start(). Machine Learning Server starts the batch execution and assigns an ID to the execution and returns the batch object.

Syntax: start()

No arguments.

Returns: The batch object

Example: txBatch <- txBatch$start()

Note

We recommend you always use the id function after you start the execution so that you can find it easily with this id later such as: txBatch$start()$id()


Get batch ID

Get the batch task's identifier from the service object so you can reference it during or after its execution using id().

Syntax: id()

No arguments.

Returns: The ID for the named batch object.

Example: id <- txBatch$id()


### Get batch by ID

Syntax: getBatch(id)

Argument Description
id The batch execution identifiers

Returns: The Batch object

Example:

service <- getService("name", "version")
# Get a Services existing batch by execution Id
txBatch <- service$getBatch("my-executionId")   
print(txBatch)

List the Batch execution identifiers

Syntax: getBatchExecutions()

No arguments

Returns: List of batch execution identifiers

Example: getBatchExecutions()


Cancel execution

Cancel the batch execution using cancel().

Syntax: cancel()

No arguments.

Returns: The batch object

Example: txBatch$cancel()

4. Monitor, retrieve, and interact with results

While the batch task is running, you can monitor and poll the results. Once the batch task has completed, you can get the web service consumption output by index from the batch results object, including:

  • Monitor execution results and status
  • Get results for a given index row returned as an array
  • Get a list of every file that was generated by this execution index
  • Print the contents of a specific artifact or all artifacts returned
  • Download artifacts from execution index

Monitor execution results and status

There are several public functions you can use to get the results and status of a batch execution.

– Monitor or get the batch execution results

  • Syntax: results(showPartialResults = TRUE)
     

    Argument Description
    showPartialResults This argument returns the already processed results of the batch execution even if it has not been fully completed. If showPartialResults = FALSE, then returns only the results if the execution has completed.
  • Returns: A batch result object is returned, which in our example is called batchRes.
     

– Get the status of the batch execution.

  • Syntax: STATE
      no arguments
  • Returns: The status of the batch execution.
     

Example: In this example, we return partial results every three seconds until the batch execution fails or completes. Then, we return results for a given index row returned as an array.

batchRes <- NULL
while(TRUE) {
   # Send any results, including partial results, for txBatch task
   # Assign it to the batch result variable batchRes: 
   batchRes <- txBatch$results(showPartialResult = TRUE)

   # Check STATUS of the task
   if (batchRes$state == txBatch$STATE$failed) { stop("Batch execution failed") } 
   if (batchRes$state == txBatch$STATE$complete) { break }

   # Repeat check every 3 seconds
   message("Polling for asynchronous batch to complete...")   
   Sys.sleep(3)
}

Get execution results as an array

Get the execution results for a given index row.

Syntax: execution(index)

Argument Description
index Index value for a given batch data record

Returns: The execution results for a given index row as an array.

Example: In this example, we return partial results every three seconds until the batch execution fails or completes. Then, we return results for a given index row returned as an array.

# In a loop, get results for a given index row returned as an array in a loop
# assign it to 'exe' and output a data frame for each row.
for(i in seq(batchRes$totalItemCount)) {
   answer <- batchRes$execution(1)$outputParameters$answer
   answer
}

Get list of generated artifacts

Retrieve a list of every artifact that was generated during the batch execution for a given data record, or index, with listArtifacts(). This function can be made part of a loop to get the list of the artifacts for every data record (see workflow example for a loop).

Syntax: listArtifacts(index)

Argument Description
index Index value for a given batch data record

Returns: A list of every artifact that was generated during the batch execution for a given data record

Example: files <- txBatch$listArtifacts(i)


Display artifact contents

Display the contents of a named artifact returned in the preceding list with artifact(). Machine Learning Server returns the ID for the named batch object.

Syntax: artifact(index, fileName)

Argument Description
index Index value for a given batch data record
fileName Name of file artifact created during batch execution

Returns: The ID for the named batch object

Example:

for(i in seq(batchRes$totalItemCount)) {
       for (fileName in files) {
          content <- txBatch$artifact(i, fileName)
          if (is.null(content)) { stop("Unable to get artifact") }
       }
}       

Download generated artifacts

Download any artifacts from a specific execution index using download(). By default, artifacts are downloaded to the current working directory getwd() unless a different dest = "<path>" is specified. You can choose to download a specific artifact or all artifacts.

Syntax: download(index, fileName, dest = "<path>")

Argument Description
index Index value for a given batch data record
fileName Name of specific artifact generated during batch execution. If omitted, all artifacts are downloaded for that index.
dest The download directory on your local machine. The default is the current R working directory. The directory must already exist on the local machine.

Returns: The path to each downloaded artifact.

Example:

In this example, we download a named artifact for the fifth index to a specified directory, and then download all artifacts to the default working directory.

#Download a named file for 5th index to the specified directory
txBatch$download(5, "Histogram.png", dest = "C:/bgates/batchfiles/")

# Download all files for a given index to the default current R working directory in a loop
for(i in seq(batchRes$totalItemCount)) {
  txBatch$download(i)
}

See also