Помимо вызова функций активности, функции оркестратора могут вызывать другие функции оркестратора. Например, можно создать большую оркестрацию из библиотеки небольших функций оркестратора. Кроме того, можно параллельно запускать несколько экземпляров функции оркестратора.
Функция оркестратора вызывает другую оркестрационную функцию с помощью API вызова суб-оркестратора. Дополнительные сведения об автоматической повторной попытке см. в разделе "Обработка ошибок" и "Компенсация".
Функции под оркестратора работают так же, как функции действий с точки зрения вызывающего пользователя. Они могут возвращать значение, а родительская функция оркестратора перехватывает любое исключение, которое они вызывают.
Пример
В следующем примере показан сценарий Интернета вещей ("Интернет вещей"), в котором необходимо настроить несколько устройств. Следующая функция представляет рабочий процесс установки, который выполняется для каждого устройства:
Изолированная рабочая модель
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
Модель внутрипроцессного процесса
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string deviceId = context.GetInput<string>();
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", Tuple.Create(deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
const df = require("durable-functions");
df.app.orchestration("deviceProvisioningOrchestration", function* (context) {
const deviceId = context.df.getInput();
// Step 1: Create an installation package in blob storage and return a SAS URL.
const sasUrl = yield context.df.callActivity("createInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
yield context.df.callActivity("sendPackageUrlToDevice", { id: deviceId, url: sasUrl });
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.df.waitForExternalEvent("downloadCompletedAck");
// Step 4: ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_id = context.get_input()
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield context.call_activity("CreateInstallationPackage", device_id)
# Step 2: Notify the device that the installation package is ready.
yield context.call_activity("SendPackageUrlToDevice", { "id": device_id, "url": sas_url })
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.call_activity("DownloadCompletedAck")
# Step 4: ...
param($Context)
$deviceId = $Context.Input
# Step 1: Create an installation package in blob storage and return a SAS URL.
$sasUrl = Invoke-DurableActivity -FunctionName "CreateInstallationPackage" -Input $deviceId
# Step 2: Notify the device that the installation package is ready.
$deviceInfo = @{
id = $deviceId
url = $sasUrl
}
Invoke-DurableActivity -FunctionName "SendPackageUrlToDevice" -Input $deviceInfo
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
Start-DurableExternalEventListener -EventName "DownloadCompletedAck"
# Step 4: ...
@FunctionName("DeviceProvisioningOrchestration")
public void deviceProvisioningOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Step 1: Create an installation package in blob storage and return a SAS URL.
String deviceId = ctx.getInput(String.class);
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
using Microsoft.DurableTask;
[DurableTask]
public class DeviceProvisioningOrchestration : TaskOrchestrator<string, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl.ToString()));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
return null;
}
}
Этот пример показан для .NET, Java и Python.
from durabletask import task
def device_provisioning_orchestrator(ctx: task.OrchestrationContext, device_id: str):
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield ctx.call_activity("create_installation_package", input=device_id)
# Step 2: Notify the device that the installation package is ready.
yield ctx.call_activity("send_package_url_to_device", input={"id": device_id, "url": sas_url})
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield ctx.wait_for_external_event("DownloadCompletedAck")
# Step 4: ...
Этот пример показан для .NET, Java и Python.
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class DeviceProvisioningOrchestration implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
String deviceId = ctx.getInput(String.class);
// Step 1: Create an installation package in blob storage and return a SAS URL.
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
}
Эту функцию оркестратора можно использовать в текущем виде для одноразовой установки устройства или она может быть частью более крупной оркестрации. В последнем случае родительская функция оркестратора планирует экземпляры DeviceProvisioningOrchestration с помощью API call-sub-orchestrator .
В следующем примере показано, как параллельно выполнять несколько функций оркестратора.
Изолированная рабочая модель
[Function("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
Модель внутрипроцессного процесса
[FunctionName("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
const df = require("durable-functions");
df.app.orchestration("provisionNewDevices", function* (context) {
const deviceIds = yield context.df.callActivity("getNewDeviceIds");
// Run multiple device provisioning flows in parallel
const provisioningTasks = [];
var id = 0;
for (const deviceId of deviceIds) {
const child_id = context.df.instanceId + `:${id}`;
const provisionTask = context.df.callSubOrchestrator(
"deviceProvisioningOrchestration",
deviceId,
child_id
);
provisioningTasks.push(provisionTask);
id++;
}
yield context.df.Task.all(provisioningTasks);
// ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_IDs = yield context.call_activity("GetNewDeviceIds")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
id_ = 0
for device_id in device_IDs:
child_id = f"{context.instance_id}:{id_}"
provision_task = context.call_sub_orchestrator("DeviceProvisioningOrchestration", device_id, child_id)
provisioning_tasks.append(provision_task)
id_ += 1
yield context.task_all(provisioning_tasks)
# ...
param($Context)
$deviceIds = Invoke-DurableActivity -FunctionName "GetNewDeviceIds"
# Run multiple device setting up flows in parallel
$provisioningTasks = @()
for ($i = 0; $i -lt $deviceIds.Count; $i++) {
$deviceId = $deviceIds[$i]
$childId = "$($Context.InstanceId):$i"
$provisionTask = Invoke-DurableSubOrchestrator `
-FunctionName "DeviceProvisioningOrchestration" `
-Input $deviceId `
-InstanceId $childId `
-NoWait
$provisioningTasks += $provisionTask
}
Wait-DurableTask -Task $provisioningTasks
# ...
@FunctionName("ProvisionNewDevices")
public void provisionNewDevices(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
// ...
}
Замечание
Подоркестрации должны быть определены в том же функциональном приложении, что и родительская оркестрация. Если вам нужно вызвать и ждать оркестрации в другом функциональном приложении, рассмотрите использование встроенной поддержки API HTTP и шаблона опроса HTTP 202. Дополнительные сведения см. в разделе "Функции HTTP".
Дальнейшие действия
using Microsoft.DurableTask;
[DurableTask]
public class ProvisionNewDevices : TaskOrchestrator<object?, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
return null;
}
}
Этот пример показан для .NET, Java и Python.
from durabletask import task
def provision_new_devices(ctx: task.OrchestrationContext, _):
device_ids = yield ctx.call_activity("get_new_device_ids")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
for device_id in device_ids:
provision_task = ctx.call_sub_orchestrator(device_provisioning_orchestrator, input=device_id)
provisioning_tasks.append(provision_task)
yield task.when_all(provisioning_tasks)
Этот пример показан для .NET, Java и Python.
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class ProvisionNewDevices implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
ctx.allOf(parallelTasks).await();
}
}
Дальнейшие действия