오케스트레이터 함수는 다른 오케스트레이터 함수를 하위 오케스트레이션으로 호출할 수 있습니다. 호출(부모) 오케스트레이터의 자식으로 하위 오케스트레이션이 실행되며, 이는 호출자의 관점에서 작업처럼 동작합니다. 즉, 값을 반환하고, 부모가 catch할 수 있는 예외를 throw하며, 자동 다시 시도를 지원할 수 있습니다.
하위 오케스트레이션을 사용하는 경우
다음을 수행해야 하는 경우 하위 오케스트레이션을 사용합니다.
-
재사용 가능한 워크플로 구성 요소 작성: 여러 부모 오케스트레이션이 호출할 수 있도록 다단계 워크플로를 자체 오케스트레이터로 추출합니다.
-
병렬로 오케스트레이션 분산: 동일한 오케스트레이터의 여러 인스턴스를 동시에 실행하고, 모든 인스턴스가 완료될 때까지 대기합니다.
-
복잡한 워크플로 구성: 큰 오케스트레이션을 단일 Long 함수 대신 명명된 테스트 가능한 조각으로 분할합니다.
메모
하위 오케스트레이션은 부모 오케스트레이션과 동일한 앱에서 정의되어야 합니다. 다른 앱에서 오케스트레이션을 호출하려면 대신 HTTP 202 폴링 패턴을 사용합니다. 자세한 내용은 HTTP 기능을 참조하세요.
문서 내용:
하위 오케스트레이션 정의
다음 예제에서는 여러 디바이스를 설정해야 하는 IoT 시나리오를 보여 줍니다. 이 함수는 각 디바이스에 대해 실행되는 설정 워크플로를 나타냅니다.
격리된 작업자 모델
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
진행 중 모델
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string deviceId = context.GetInput<string>();
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", Tuple.Create(deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
const df = require("durable-functions");
df.app.orchestration("deviceProvisioningOrchestration", function* (context) {
const deviceId = context.df.getInput();
// Step 1: Create an installation package in blob storage and return a SAS URL.
const sasUrl = yield context.df.callActivity("createInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
yield context.df.callActivity("sendPackageUrlToDevice", { id: deviceId, url: sasUrl });
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.df.waitForExternalEvent("downloadCompletedAck");
// Step 4: ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_id = context.get_input()
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield context.call_activity("CreateInstallationPackage", device_id)
# Step 2: Notify the device that the installation package is ready.
yield context.call_activity("SendPackageUrlToDevice", { "id": device_id, "url": sas_url })
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.call_activity("DownloadCompletedAck")
# Step 4: ...
param($Context)
$deviceId = $Context.Input
# Step 1: Create an installation package in blob storage and return a SAS URL.
$sasUrl = Invoke-DurableActivity -FunctionName "CreateInstallationPackage" -Input $deviceId
# Step 2: Notify the device that the installation package is ready.
$deviceInfo = @{
id = $deviceId
url = $sasUrl
}
Invoke-DurableActivity -FunctionName "SendPackageUrlToDevice" -Input $deviceInfo
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
Start-DurableExternalEventListener -EventName "DownloadCompletedAck"
# Step 4: ...
@FunctionName("DeviceProvisioningOrchestration")
public void deviceProvisioningOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Step 1: Create an installation package in blob storage and return a SAS URL.
String deviceId = ctx.getInput(String.class);
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
using Microsoft.DurableTask;
[DurableTask]
public class DeviceProvisioningOrchestration : TaskOrchestrator<string, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl.ToString()));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
return null;
}
}
이 샘플은 .NET, Java 및 Python 대해 표시됩니다.
from durabletask import task
def device_provisioning_orchestrator(ctx: task.OrchestrationContext, device_id: str):
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield ctx.call_activity("create_installation_package", input=device_id)
# Step 2: Notify the device that the installation package is ready.
yield ctx.call_activity("send_package_url_to_device", input={"id": device_id, "url": sas_url})
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield ctx.wait_for_external_event("DownloadCompletedAck")
# Step 4: ...
이 샘플은 .NET, Java 및 Python 대해 표시됩니다.
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class DeviceProvisioningOrchestration implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
String deviceId = ctx.getInput(String.class);
// Step 1: Create an installation package in blob storage and return a SAS URL.
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
}
이 오케스트레이터 함수는 일회성 디바이스 설정에 대해 독립 실행형으로 실행되거나 부모 오케스트레이터는 호출 하위 오케스트레이터 API를 사용하여 하위 오케스트레이션으로 예약할 수 있습니다.
병렬로 하위 오케스트레이션 실행
다음 예제에서는 여러 하위 오케스트레이션을 병렬로 분배하는 부모 오케스트레이터를 보여줍니다. 일부 언어는 부모의 인스턴스 ID와 인덱스에서 파생된 결정적 자식 인스턴스 ID를 사용하여 재생 시 중복 하위 오케스트레이션을 방지합니다.
격리된 작업자 모델
[Function("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
진행 중 모델
[FunctionName("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
const df = require("durable-functions");
df.app.orchestration("provisionNewDevices", function* (context) {
const deviceIds = yield context.df.callActivity("getNewDeviceIds");
// Run multiple device provisioning flows in parallel
const provisioningTasks = [];
var id = 0;
for (const deviceId of deviceIds) {
const child_id = context.df.instanceId + `:${id}`;
const provisionTask = context.df.callSubOrchestrator(
"deviceProvisioningOrchestration",
deviceId,
child_id
);
provisioningTasks.push(provisionTask);
id++;
}
yield context.df.Task.all(provisioningTasks);
// ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_IDs = yield context.call_activity("GetNewDeviceIds")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
id_ = 0
for device_id in device_IDs:
child_id = f"{context.instance_id}:{id_}"
provision_task = context.call_sub_orchestrator("DeviceProvisioningOrchestration", device_id, child_id)
provisioning_tasks.append(provision_task)
id_ += 1
yield context.task_all(provisioning_tasks)
# ...
param($Context)
$deviceIds = Invoke-DurableActivity -FunctionName "GetNewDeviceIds"
# Run multiple device setting up flows in parallel
$provisioningTasks = @()
for ($i = 0; $i -lt $deviceIds.Count; $i++) {
$deviceId = $deviceIds[$i]
$childId = "$($Context.InstanceId):$i"
$provisionTask = Invoke-DurableSubOrchestrator `
-FunctionName "DeviceProvisioningOrchestration" `
-Input $deviceId `
-InstanceId $childId `
-NoWait
$provisioningTasks += $provisionTask
}
Wait-DurableTask -Task $provisioningTasks
# ...
@FunctionName("ProvisionNewDevices")
public void provisionNewDevices(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
// ...
}
다음 단계
using Microsoft.DurableTask;
[DurableTask]
public class ProvisionNewDevices : TaskOrchestrator<object?, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
return null;
}
}
이 샘플은 .NET, Java 및 Python 대해 표시됩니다.
from durabletask import task
def provision_new_devices(ctx: task.OrchestrationContext, _):
device_ids = yield ctx.call_activity("get_new_device_ids")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
for device_id in device_ids:
provision_task = ctx.call_sub_orchestrator(device_provisioning_orchestrator, input=device_id)
provisioning_tasks.append(provision_task)
yield task.when_all(provisioning_tasks)
이 샘플은 .NET, Java 및 Python 대해 표시됩니다.
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class ProvisionNewDevices implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
ctx.allOf(parallelTasks).await();
}
}
다음 단계