W przypadku zadań w tle często trzeba upewnić się, że tylko jedno wystąpienie określonego orkiestratora jest uruchamiane jednocześnie, uniemożliwiając jednoczesne uruchamianie zduplikowanych aranżacji. Ten pojedynczy wzorzec można zaimplementować w Durable Functions lub Durable Task SDK przypisując określony identyfikator wystąpienia do orkiestratora podczas jego tworzenia, a następnie sprawdzając, czy wystąpienie z tym identyfikatorem jest już uruchomione przed uruchomieniem nowego.
W tym artykule pokazano, jak zaimplementować pojedyncze orkiestratory z przykładami kodu dla każdego obsługiwanego języka.
Wymagania wstępne
Uwaga / Notatka
Istnieje potencjalny stan wyścigu we wzorze singleton. Jeśli dwóch klientów wykonuje logikę weryfikacji i uruchomienia jednocześnie, oba wywołania mogą zakończyć się powodzeniem, ale faktycznie uruchamia się tylko jedno wystąpienie orkiestracji. W zależności od wymagań może to mieć niepożądane skutki uboczne. Jeśli wymagane są ścisłe gwarancje dotyczące pojedynczej instancji, rozważ dodanie dodatkowych mechanizmów blokowania.
Ważna
Obecnie zestaw POWERShell Durable Task SDK nie jest dostępny.
Przykład orkiestratora pojedynczego
W poniższym przykładzie przedstawiono funkcję wyzwalacza HTTP, która tworzy pojedynczą aranżację zadań w tle. Kod próbuje upewnić się, że istnieje tylko jedno aktywne wystąpienie dla określonego identyfikatora wystąpienia.
[Function("HttpStartSingle")]
public static async Task<HttpResponseData> RunSingle(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "orchestrators/{functionName}/{instanceId}")] HttpRequestData req,
[DurableClient] DurableTaskClient starter,
string functionName,
string instanceId,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("HttpStartSingle");
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
OrchestrationMetadata? existingInstance = await starter.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
await starter.ScheduleNewOrchestrationInstanceAsync(functionName, requestBody, new StartOrchestrationOptions { InstanceId = instanceId });
logger.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return await starter.CreateCheckStatusResponseAsync(req, instanceId);
}
else
{
// An instance with the specified ID exists or an existing one still running, don't create one.
var response = req.CreateResponse(HttpStatusCode.Conflict);
await response.WriteStringAsync($"An instance with ID '{instanceId}' already exists.");
return response;
}
}
Uwaga / Notatka
Poprzedni kod języka C# jest przeznaczony dla izolowanego modelu roboczego, który jest zalecanym modelem dla aplikacji .NET. Aby uzyskać więcej informacji na temat różnic między modelami roboczymi zintegrowanymi i izolowanymi, zobacz artykuł wersje Durable Functions.
Ważna
W tym przykładzie języka JavaScript użyto modelu programowania Node.js w wersji 3, który używa function.json. Jeśli używasz modelu programowaniaNode.js w wersji 4 (bieżąca wartość domyślna), zamiast tego użyj wzorca opartego na kodzie w wersji 4.
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}/{instanceId}",
"methods": ["post"]
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
index.js
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instanceId = req.params.instanceId;
const functionName = req.params.functionName;
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
const existingInstance = await client.getStatus(instanceId);
if (!existingInstance
|| existingInstance.runtimeStatus == "Completed"
|| existingInstance.runtimeStatus == "Failed"
|| existingInstance.runtimeStatus == "Terminated") {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
const eventData = req.body;
await client.startNew(functionName, instanceId, eventData);
context.log(`Started orchestration with ID = '${instanceId}'.`);
return client.createCheckStatusResponse(req, instanceId);
} else {
// An instance with the specified ID exists or an existing one still running, don't create one.
return {
status: 409,
body: `An instance with ID '${instanceId}' already exists.`,
};
}
};
function_app.py
import logging
import azure.functions as func
import azure.durable_functions as df
app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION)
@app.route(route="orchestrators/{functionName}/{instanceId}", methods=["POST"])
@app.durable_client_input(client_name="client")
async def http_start_single(req: func.HttpRequest, client):
instance_id = req.route_params["instanceId"]
function_name = req.route_params["functionName"]
existing_instance = await client.get_status(instance_id)
if not existing_instance or existing_instance.runtime_status in [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Failed,
df.OrchestrationRuntimeStatus.Terminated,
]:
event_data = req.get_body()
instance_id = await client.start_new(function_name, instance_id, event_data)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
return func.HttpResponse(
body=f"An instance with ID '{instance_id}' already exists.",
status_code=409,
)
using namespace System.Net
param($Request, $TriggerMetadata)
$FunctionName = $Request.Params.FunctionName
$InstanceId = $Request.Params.InstanceId
# Check if an instance with the specified ID already exists
$existingInstance = Get-DurableStatus -InstanceId $InstanceId
if (-not $existingInstance -or
$existingInstance.RuntimeStatus -eq "Completed" -or
$existingInstance.RuntimeStatus -eq "Failed" -or
$existingInstance.RuntimeStatus -eq "Terminated") {
# An instance with the specified ID doesn't exist or stopped running, create one
$InstanceId = Start-DurableOrchestration -FunctionName $FunctionName -InstanceId $InstanceId -Input $Request.Body
Write-Host "Started orchestration with ID = '$InstanceId'."
$Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
}
else {
# An instance with the specified ID exists or still running
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::Conflict
Body = "An instance with ID '$InstanceId' already exists."
})
}
@FunctionName("HttpStartSingle")
public HttpResponseMessage runSingle(
@HttpTrigger(name = "req", route = "orchestrators/{functionName}/{instanceId}") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceId = req.getHeaders().getOrDefault("instanceId", "singleton-job");
DurableTaskClient client = durableContext.getClient();
// Check to see if an instance with this ID is already running
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, false);
if (metadata == null || !metadata.isRunning()) {
// No such instance exists or it finished - create a new one.
// De-dupe is handled automatically in the storage layer if another
// function tries to also use this instance ID.
client.scheduleNewOrchestrationInstance("MyOrchestration", null, instanceId);
return durableContext.createCheckStatusResponse(req, instanceId);
}
return req.createResponseBuilder(HttpStatus.CONFLICT)
.body("An instance with ID '" + instanceId + "' already exists.")
.build();
}
W poniższym przykładzie pokazano, jak utworzyć singletonową orkiestrację przy użyciu Durable Task SDK. Kod próbuje upewnić się, że istnieje tylko jedno aktywne wystąpienie dla określonego identyfikatora wystąpienia.
using Microsoft.DurableTask.Client;
// Check if an instance with the specified ID already exists
string instanceId = "singleton-job";
OrchestrationMetadata? existingInstance = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.ScheduleNewOrchestrationInstanceAsync("MyOrchestration", input, new StartOrchestrationOptions(instanceId));
Console.WriteLine($"Started orchestration with ID = '{instanceId}'.");
}
else
{
// An instance with the specified ID exists or an existing one still running.
Console.WriteLine($"An instance with ID '{instanceId}' already exists.");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
instance_id = "singleton-job"
# Check if an instance with the specified ID already exists
existing_instance = client.get_orchestration_state(instance_id)
if (existing_instance is None or
existing_instance.runtime_status in ['COMPLETED', 'FAILED', 'TERMINATED']):
# An instance with the specified ID doesn't exist or an existing one stopped running, create one.
client.schedule_new_orchestration(my_orchestration, input=input_data, instance_id=instance_id)
print(f"Started orchestration with ID = '{instance_id}'.")
else:
# An instance with the specified ID exists or an existing one still running.
print(f"An instance with ID '{instance_id}' already exists.")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
String instanceId = "singleton-job";
// Check to see if an instance with this ID is already running
OrchestrationMetadata existingInstance = client.getInstanceMetadata(instanceId, false);
if (existingInstance == null || !existingInstance.isRunning()) {
// An instance doesn't exist or finished - create one
client.scheduleNewOrchestrationInstance("MyOrchestration", input, instanceId);
System.out.println("Started orchestration with ID = '" + instanceId + "'.");
} else {
// An instance with the specified ID exists and is still running
System.out.println("An instance with ID '" + instanceId + "' already exists.");
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
const instanceId = "singleton-job";
// Check if an instance with the specified ID already exists
const existingInstance = await client.getOrchestrationState(instanceId, false);
if (!existingInstance ||
existingInstance.runtimeStatus === OrchestrationStatus.COMPLETED ||
existingInstance.runtimeStatus === OrchestrationStatus.FAILED ||
existingInstance.runtimeStatus === OrchestrationStatus.TERMINATED) {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.scheduleNewOrchestration("MyOrchestration", input, instanceId);
console.log(`Started orchestration with ID = '${instanceId}'.`);
} else {
// An instance with the specified ID exists or an existing one still running.
console.log(`An instance with ID '${instanceId}' already exists.`);
}
Zestaw Durable Task SDK nie jest dostępny dla programu PowerShell. Zamiast tego użyj Durable Functions.
Jak działa wzorzec projektowy singleton
Ponieważ identyfikatory wystąpień są unikatowe w centrum zadań, planowanie aranżacji ze znanym, stałym identyfikatorem i sprawdzaniem jego stanu najpierw zapobiega duplikowaniu współbieżnych uruchomień. Domyślnie identyfikatory wystąpień to losowo generowane identyfikatory GUID. Jednak w poprzednich przykładach jest przekazywany określony identyfikator wystąpienia. Następnie kod pobiera metadane instancji orkiestracji, aby sprawdzić, czy instancja z tym identyfikatorem jest już uruchomiona. Jeśli takie wystąpienie nie jest uruchomione, nowe wystąpienie z tym identyfikatorem zostanie utworzone.
Funkcja orkiestratora może używać dowolnego wzorca — standardowa funkcja, która się rozpoczyna i kończy, lub wieczna orkiestracja, która działa w sposób ciągły. Wzorzec pojedynczy steruje tylko tym, ile wystąpień jest uruchamianych współbieżnie.
Następne kroki