Per i processi in background, spesso è necessario assicurarsi che venga eseguita simultaneamente una sola istanza di un agente di orchestrazione specifico, impedendo l'esecuzione simultanea di orchestrazioni duplicate. È possibile implementare questo modello singleton negli SDK delle attività Durable Functions o Durable Task SDK assegnando un ID istanza specifico a un agente di orchestrazione durante la creazione e quindi verificando se un'istanza con tale ID è già in esecuzione prima di avviarne una nuova.
Questo articolo illustra come implementare agenti di orchestrazione singleton con esempi di codice per ogni linguaggio supportato.
Prerequisiti
Annotazioni
Esiste una potenziale race condition nel modello singleton. Se due client eseguono simultaneamente la logica check-and-start, entrambe le chiamate potrebbero segnalare l'esito positivo, ma viene avviata una sola istanza di orchestrazione. A seconda dei requisiti, questo può avere effetti collaterali indesiderati. Se sono necessarie garanzie rigorose a istanza singola, è consigliabile aggiungere meccanismi di blocco aggiuntivi.
Importante
Attualmente, Durable Task SDK di PowerShell non è disponibile.
Esempio di orchestratore Singleton
L'esempio seguente illustra una funzione di attivazione HTTP che crea un'orchestrazione di processo in background singleton. Il codice tenta di assicurarsi che esista una sola istanza attiva per un ID istanza specificato.
[Function("HttpStartSingle")]
public static async Task<HttpResponseData> RunSingle(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "orchestrators/{functionName}/{instanceId}")] HttpRequestData req,
[DurableClient] DurableTaskClient starter,
string functionName,
string instanceId,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("HttpStartSingle");
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
OrchestrationMetadata? existingInstance = await starter.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
await starter.ScheduleNewOrchestrationInstanceAsync(functionName, requestBody, new StartOrchestrationOptions { InstanceId = instanceId });
logger.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return await starter.CreateCheckStatusResponseAsync(req, instanceId);
}
else
{
// An instance with the specified ID exists or an existing one still running, don't create one.
var response = req.CreateResponse(HttpStatusCode.Conflict);
await response.WriteStringAsync($"An instance with ID '{instanceId}' already exists.");
return response;
}
}
Annotazioni
Il codice C# precedente riguarda il modello di lavoro isolato, che è il modello consigliato per le app .NET. Per altre informazioni sulle differenze tra i modelli di processi di lavoro isolati e in-process, vedere l'articolo Versioni di Durable Functions.
Importante
Questo esempio JavaScript usa il modello di programmazione Node.js v3, che usa function.json. Se si usa il modello di programmazione Node.js v4 (impostazione predefinita corrente), usare invece il modello 'code-first' v4.
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}/{instanceId}",
"methods": ["post"]
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
index.js
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instanceId = req.params.instanceId;
const functionName = req.params.functionName;
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
const existingInstance = await client.getStatus(instanceId);
if (!existingInstance
|| existingInstance.runtimeStatus == "Completed"
|| existingInstance.runtimeStatus == "Failed"
|| existingInstance.runtimeStatus == "Terminated") {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
const eventData = req.body;
await client.startNew(functionName, instanceId, eventData);
context.log(`Started orchestration with ID = '${instanceId}'.`);
return client.createCheckStatusResponse(req, instanceId);
} else {
// An instance with the specified ID exists or an existing one still running, don't create one.
return {
status: 409,
body: `An instance with ID '${instanceId}' already exists.`,
};
}
};
function_app.py
import logging
import azure.functions as func
import azure.durable_functions as df
app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION)
@app.route(route="orchestrators/{functionName}/{instanceId}", methods=["POST"])
@app.durable_client_input(client_name="client")
async def http_start_single(req: func.HttpRequest, client):
instance_id = req.route_params["instanceId"]
function_name = req.route_params["functionName"]
existing_instance = await client.get_status(instance_id)
if not existing_instance or existing_instance.runtime_status in [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Failed,
df.OrchestrationRuntimeStatus.Terminated,
]:
event_data = req.get_body()
instance_id = await client.start_new(function_name, instance_id, event_data)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
return func.HttpResponse(
body=f"An instance with ID '{instance_id}' already exists.",
status_code=409,
)
using namespace System.Net
param($Request, $TriggerMetadata)
$FunctionName = $Request.Params.FunctionName
$InstanceId = $Request.Params.InstanceId
# Check if an instance with the specified ID already exists
$existingInstance = Get-DurableStatus -InstanceId $InstanceId
if (-not $existingInstance -or
$existingInstance.RuntimeStatus -eq "Completed" -or
$existingInstance.RuntimeStatus -eq "Failed" -or
$existingInstance.RuntimeStatus -eq "Terminated") {
# An instance with the specified ID doesn't exist or stopped running, create one
$InstanceId = Start-DurableOrchestration -FunctionName $FunctionName -InstanceId $InstanceId -Input $Request.Body
Write-Host "Started orchestration with ID = '$InstanceId'."
$Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
}
else {
# An instance with the specified ID exists or still running
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::Conflict
Body = "An instance with ID '$InstanceId' already exists."
})
}
@FunctionName("HttpStartSingle")
public HttpResponseMessage runSingle(
@HttpTrigger(name = "req", route = "orchestrators/{functionName}/{instanceId}") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceId = req.getHeaders().getOrDefault("instanceId", "singleton-job");
DurableTaskClient client = durableContext.getClient();
// Check to see if an instance with this ID is already running
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, false);
if (metadata == null || !metadata.isRunning()) {
// No such instance exists or it finished - create a new one.
// De-dupe is handled automatically in the storage layer if another
// function tries to also use this instance ID.
client.scheduleNewOrchestrationInstance("MyOrchestration", null, instanceId);
return durableContext.createCheckStatusResponse(req, instanceId);
}
return req.createResponseBuilder(HttpStatus.CONFLICT)
.body("An instance with ID '" + instanceId + "' already exists.")
.build();
}
L'esempio seguente illustra come creare un'orchestrazione singleton usando gli SDK di Durable Task. Il codice tenta di assicurarsi che esista una sola istanza attiva per un ID istanza specificato.
using Microsoft.DurableTask.Client;
// Check if an instance with the specified ID already exists
string instanceId = "singleton-job";
OrchestrationMetadata? existingInstance = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.ScheduleNewOrchestrationInstanceAsync("MyOrchestration", input, new StartOrchestrationOptions(instanceId));
Console.WriteLine($"Started orchestration with ID = '{instanceId}'.");
}
else
{
// An instance with the specified ID exists or an existing one still running.
Console.WriteLine($"An instance with ID '{instanceId}' already exists.");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
instance_id = "singleton-job"
# Check if an instance with the specified ID already exists
existing_instance = client.get_orchestration_state(instance_id)
if (existing_instance is None or
existing_instance.runtime_status in ['COMPLETED', 'FAILED', 'TERMINATED']):
# An instance with the specified ID doesn't exist or an existing one stopped running, create one.
client.schedule_new_orchestration(my_orchestration, input=input_data, instance_id=instance_id)
print(f"Started orchestration with ID = '{instance_id}'.")
else:
# An instance with the specified ID exists or an existing one still running.
print(f"An instance with ID '{instance_id}' already exists.")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
String instanceId = "singleton-job";
// Check to see if an instance with this ID is already running
OrchestrationMetadata existingInstance = client.getInstanceMetadata(instanceId, false);
if (existingInstance == null || !existingInstance.isRunning()) {
// An instance doesn't exist or finished - create one
client.scheduleNewOrchestrationInstance("MyOrchestration", input, instanceId);
System.out.println("Started orchestration with ID = '" + instanceId + "'.");
} else {
// An instance with the specified ID exists and is still running
System.out.println("An instance with ID '" + instanceId + "' already exists.");
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
const instanceId = "singleton-job";
// Check if an instance with the specified ID already exists
const existingInstance = await client.getOrchestrationState(instanceId, false);
if (!existingInstance ||
existingInstance.runtimeStatus === OrchestrationStatus.COMPLETED ||
existingInstance.runtimeStatus === OrchestrationStatus.FAILED ||
existingInstance.runtimeStatus === OrchestrationStatus.TERMINATED) {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.scheduleNewOrchestration("MyOrchestration", input, instanceId);
console.log(`Started orchestration with ID = '${instanceId}'.`);
} else {
// An instance with the specified ID exists or an existing one still running.
console.log(`An instance with ID '${instanceId}' already exists.`);
}
Funzionamento del modello singleton
Poiché gli ID istanza sono univoci all'interno di un hub attività, la pianificazione di un'orchestrazione con un ID fisso noto e il controllo a priori del relativo stato impedisce esecuzioni simultanee duplicate. Per impostazione predefinita, gli ID istanza sono GUID generati in modo casuale. Negli esempi precedenti, tuttavia, viene passato un ID istanza specifico. Il codice recupera quindi i metadati dell'istanza di orchestrazione per verificare se un'istanza con tale ID è già in esecuzione. Se non è in esecuzione alcuna istanza di questo tipo, viene creata una nuova istanza con tale ID.
La funzione dell'agente di orchestrazione può usare qualsiasi modello, ovvero una funzione standard che avvia e completa o un'orchestrazione eterna che viene eseguita continuamente. Il modello singleton controlla solo il numero di istanze eseguite simultaneamente.
Passaggi successivi