Informazioni su Durable Functions

Durable Functions è un'estensione di Funzioni di Azure che consente di scrivere funzioni con stato in un ambiente di calcolo serverless. L'estensione permette di definire flussi di lavoro con stato, scrivendo funzioni dell'agente di orchestrazione, ed entità con stato, scrivendo funzioni di entità tramite il modello di programmazione di Funzioni di Azure. Dietro le quinte, l'estensione gestisce automaticamente lo stato, i checkpoint e i riavvii, consentendo di concentrarsi sulla logica di business.

Lingue supportate

Durable Functions è progettato per funzionare con tutti i linguaggi di programmazione Funzioni di Azure, ma potrebbe avere requisiti minimi diversi per ogni linguaggio. La tabella seguente illustra le configurazioni minime supportate per le app:

Stack di linguaggio versioni di runtime di Funzioni di Azure Versione del ruolo di lavoro per la lingua Versione minima dei bundle
.NET/ C# / F# Funzioni 1.0+ In-process
Out-of-process
N/D
JavaScript/TypeScript (modello prog V3) Funzioni 2.0+ Nodo 8+ Bundle 2.x
JavaScript/TypeScript (modello prog V4) Funzioni 4.25+ Nodo 18+ 3.15+ bundle
Python Funzioni 2.0+ Python 3.7+ Bundle 2.x
Python (modello prog V2) Funzioni 4.0+ Python 3.7+ 3.15+ bundle
PowerShell Funzioni 3.0+ PowerShell 7+ Bundle 2.x
Java Funzioni 4.0+ Java 8+ Bundle 4.x

Importante

Questo articolo usa le schede per supportare più versioni del modello di programmazione Node.js. Il modello v4 è disponibile a livello generale ed è progettato per offrire un'esperienza più flessibile e intuitiva per gli sviluppatori JavaScript e TypeScript. Per altre informazioni sul funzionamento del modello v4, vedere la guida per sviluppatori Funzioni di Azure Node.js. Per altre informazioni sulle differenze tra v3 e v4, vedere la guida alla migrazione.

Importante

Questo articolo usa le schede per supportare più versioni del modello di programmazione Python. Il modello v2 è disponibile a livello generale ed è progettato per offrire un modo più incentrato sul codice per la creazione di funzioni tramite elementi decorator. Per altre informazioni sul funzionamento del modello v2, vedere la guida per sviluppatori python Funzioni di Azure.

Come Funzioni di Azure, sono disponibili modelli che consentono di sviluppare Funzioni permanenti con Visual Studio, Visual Studio Code e le portale di Azure.

Modelli di applicazione

Durable Functions viene usato principalmente per semplificare i complessi requisiti di coordinamento con stato nelle applicazioni serverless. Le sezioni seguenti descrivono i tipici modelli di applicazione che possono trarre vantaggio da Durable Functions:

Modello 1: Concatenamento di funzioni

Nel modello di concatenamento delle funzioni una sequenza di funzioni viene eseguita in un ordine specifico. In questo modello l'output di una funzione viene applicato all'input di un'altra funzione. L'uso di code tra ogni funzione garantisce che il sistema rimanga durevole e scalabile, anche se esiste un flusso di controllo da una funzione alla successiva.

A diagram of the function chaining pattern

È possibile usare Durable Functions per implementare il modello di concatenamento di funzioni in modo conciso, come illustrato nell'esempio seguente.

In questo esempio, i valori F1, F2, F3 ed F4 sono i nomi di altre funzioni nella stessa app per le funzioni. Il flusso di controllo può essere implementato usando normali costrutti di scrittura del codice imperativa. Il codice viene eseguito dall'alto verso il basso e può implicare semantica esistente del flusso di controllo del linguaggio, ad esempio istruzioni condizionali e cicli. È possibile includere la logica di gestione degli errori nei in blocchi try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

È possibile usare il parametro context per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama await, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata await precedente. Per altre informazioni, vedere la sezione successiva Pattern #2: Fan out/fan in.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

È possibile usare l'oggetto context.df per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama yield, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata yield precedente. Per altre informazioni, vedere la sezione successiva Pattern #2: Fan out/fan in.

Nota

L'oggetto context in JavaScript rappresenta l'intero contesto della funzione. Accedere al contesto di Durable Functions usando la proprietà df nel contesto principale.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

È possibile usare l'oggetto context per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama yield, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata yield precedente. Per altre informazioni, vedere la sezione successiva Pattern #2: Fan out/fan in.

Nota

L'oggetto context in Python rappresenta il contesto dell'orchestrazione. Accedere al contesto principale di Funzioni di Azure usando la proprietà function_context nel contesto di orchestrazione.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

È possibile usare il comando Invoke-DurableActivity per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama Invoke-DurableActivity senza l'opzione NoWait, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata Invoke-DurableActivity precedente. Per altre informazioni, vedere la sezione successiva Pattern #2: Fan out/fan in.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

È possibile usare l'oggetto ctx per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. L'output di questi metodi è un Task<V> oggetto in cui V è il tipo di dati restituiti dalla funzione richiamata. Ogni volta che si chiama Task<V>.await(), il framework Durable Functions esegue il checkpoint dello stato di avanzamento dell'istanza della funzione corrente. Se il processo ricicla in modo imprevisto a metà dell'esecuzione, l'istanza della funzione riprende dalla chiamata precedente Task<V>.await() . Per altre informazioni, vedere la sezione successiva Pattern #2: Fan out/fan in.

Modello n. 2: Ventola/ventola in

Nel modello Fan-out/fan-in vengono eseguite più funzioni in parallelo e quindi viene atteso il completamento di tutte le funzioni. Alcune operazioni di aggregazione vengono spesso eseguite sui risultati restituiti dalle funzioni.

A diagram of the fan out/fan pattern

Con le funzioni normali è possibile eseguire il fan-out facendo in modo che la funzione invii più messaggi a una coda. L'operazione di fan-in è molto più complessa. In una normale funzione si scrive codice per rilevare il completamento delle funzioni attivate dalla coda e quindi archiviarne l'output.

L'estensione Durable Functions gestisce questo modello con codice relativamente semplice:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

L'operazione di fan-out viene distribuita a più istanze della funzione F2 e viene monitorata tramite un elenco dinamico di attività. Viene effettuata una chiamata a Task.WhenAll per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di await su Task.WhenAll assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

L'operazione di fan-out viene distribuita a più istanze della funzione F2 e viene monitorata tramite un elenco dinamico di attività. Viene chiamata l'API context.df.Task.all per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di yield su context.df.Task.all assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

L'operazione di fan-out viene distribuita a più istanze della funzione F2 e viene monitorata tramite un elenco dinamico di attività. Viene chiamata l'API context.task_all per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di yield su context.task_all assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

L'operazione di fan-out viene distribuita a più istanze della funzione F2 Si noti l'utilizzo dell'opzione NoWait sulla chiamata alla F2 funzione: questa opzione consente all'agente di orchestrazione di continuare a richiamare F2 senza attendere il completamento dell'attività. e viene monitorata tramite un elenco dinamico di attività. Il comando Wait-ActivityFunction viene chiamato per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di Wait-ActivityFunction assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

L'operazione di fan-out viene distribuita a più istanze della funzione F2 e viene monitorata tramite un elenco dinamico di attività. Viene effettuata una chiamata a ctx.allOf(parallelTasks).await() per attendere il completamento di tutte le funzioni chiamate. Gli output della F2 funzione vengono quindi aggregati dall'elenco di attività dinamici e restituiti come output della funzione dell'agente di orchestrazione.

Il checkpoint automatico che si verifica alla .await() chiamata su ctx.allOf(parallelTasks) garantisce che un riciclo imprevisto del processo non richieda il riavvio delle attività già completate.

Nota

In rari casi, è possibile che si verifichi un arresto anomalo nella finestra dopo che una funzione di attività è stata completata, ma prima che il completamento venga salvato nella cronologia dell'orchestrazione. In tal caso, la funzione di attività verrebbe rieseguita dall'inizio dopo il ripristino del processo.

Modello 3: API HTTP asincrone

Il modello API HTTP asincrone risolve il problema di coordinare lo stato di operazioni a esecuzione prolungata con client esterni. Un modo comune per implementare questo modello è fare in modo che un endpoint HTTP attivi l'azione a esecuzione prolungata. Reindirizzare quindi il client a un endpoint di stato di cui il client esegue il polling per sapere quando termina l'operazione.

A diagram of the HTTP API pattern

Durable Functions offre supporto incorporato per questo modello, semplificando o anche rimuovendo il codice che è necessario scrivere per interagire con le funzioni a esecuzione prolungata. Ad esempio, gli esempi di avvio rapido di Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell e Java) mostrano un semplice comando REST che è possibile usare per avviare nuove istanze di funzione dell'agente di orchestrazione. Dopo l'avvio di un'istanza, l'estensione espone le API HTTP del webhook che eseguono query sullo stato della funzione di orchestrazione.

L'esempio seguente mostra i comandi REST per avviare un agente di orchestrazione e per eseguire query sul relativo stato. Per maggiore chiarezza, alcuni dettagli sono stati omessi dall'esempio.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Poiché lo stato è gestito automaticamente dal runtime di Durable Functions, non è necessario implementare un meccanismo personalizzato di monitoraggio.

L'estensione Durable Functions espone le API HTTP predefinite che gestiscono le orchestrazioni a esecuzione prolungata. In alternativa, è possibile implementare questo modello manualmente usando trigger di funzione personalizzati (ad esempio HTTP, una coda o Hub eventi di Azure) e l'associazione client durevole. Ad esempio, è possibile usare un messaggio di coda per attivare la terminazione. In alternativa, è possibile usare un trigger HTTP protetto da un criterio di autenticazione di Microsoft Entra anziché dalle API HTTP predefinite che usano una chiave generata per l'autenticazione.

Per altre informazioni, vedere l'articolo sulle funzionalità HTTP, che spiega come esporre i processi asincroni a esecuzione prolungata su HTTP usando l'estensione Durable Functions.

Modello 4: Monitoraggio

Il modello Monitoraggio si riferisce a un processo flessibile e ricorrente in un flusso di lavoro. Un esempio è il polling fino a quando non vengono soddisfatte condizioni specifiche. È possibile usare un normale trigger timer per gestire un semplice scenario, ad esempio un processo di pulizia periodico, ma l'intervallo è statico e la gestione delle durate delle istanze diventa complessa. È possibile usare Durable Functions per creare intervalli di ricorrenza flessibili, gestire la durata delle attività e creare più processi di monitoraggio da una singola orchestrazione.

Un esempio di modello di monitoraggio consiste nell'invertire lo scenario precedente delle API HTTP asincrone. Invece di esporre un endpoint per un client esterno per monitorare un'operazione a esecuzione prolungata, il monitoraggio dell'esecuzione prolungata utilizza un endpoint esterno, in attesa di un cambiamento di stato.

A diagram of the monitor pattern

In poche righe di codice, è possibile usare Durable Functions per creare più monitoraggi che osservano endpoint arbitrari. I monitoraggi possono terminare l'esecuzione quando viene soddisfatta una condizione oppure un'altra funzione può usare il client di orchestrazione durevole per terminare i monitoraggi. È possibile cambiare l'intervallo wait di un monitoraggio in base a una condizione specifica, ad esempio un backoff esponenziale.

Il codice seguente implementa un monitoraggio di base:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Quando viene ricevuta una richiesta, viene creata una nuova istanza di orchestrazione per tale ID processo. L'istanza esegue il polling di uno stato fino a quando non viene soddisfatta una condizione o fino alla scadenza di un timeout. Un timer di Durable Functions controlla l'intervallo di polling. È quindi possibile eseguire altre operazioni o terminare l'orchestrazione.

Modello 5: Interazione umana

Molti processi automatizzati implicano una qualche interazione umana. Il problema in un processo automatizzato con interazione umana è che le persone non sono sempre disponibili e reattive quanto i servizi cloud. Un processo automatizzato può tener conto di questa interazione usando i timeout e la logica di compensazione.

Un processo di approvazione è un esempio di processo aziendale che implica interazione umana. Potrebbe essere richiesta l'approvazione di un manager per una nota spese che supera un determinato importo. Se il responsabile non approva la nota spese entro 72 ore (potrebbe essere il manager andato in vacanza), un processo di escalation inizia per ottenere l'approvazione da un altro utente (forse il manager del manager).

A diagram of the human interaction pattern

È possibile implementare il modello in questo esempio usando una funzione di orchestrazione. L'agente di orchestrazione usa un timer durevole per richiedere l'approvazione. in caso di timeout, esegue l'escalation. L'agente di orchestrazione attende che si verifichi un evento esterno, ad esempio una notifica generata tramite interazione umana.

Questi esempi creano un processo di approvazione per illustrare il modello di interazione umana:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Per creare il timer durevole, chiamare context.CreateTimer. La notifica viene ricevuta da context.WaitForExternalEvent. Viene quindi effettuata una chiamata a Task.WhenAny per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Per creare il timer durevole, chiamare context.df.createTimer. La notifica viene ricevuta da context.df.waitForExternalEvent. Viene quindi effettuata una chiamata a context.df.Task.any per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Per creare il timer durevole, chiamare context.create_timer. La notifica viene ricevuta da context.wait_for_external_event. Viene quindi effettuata una chiamata a context.task_any per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Per creare il timer durevole, chiamare Start-DurableTimer. La notifica viene ricevuta da Start-DurableExternalEventListener. Viene quindi effettuata una chiamata a Wait-DurableTask per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

La ctx.waitForExternalEvent(...).await() chiamata al metodo sospende l'orchestrazione fino a quando non riceve un evento denominato ApprovalEvent, che ha un boolean payload. Se l'evento viene ricevuto, viene chiamata una funzione di attività per elaborare il risultato dell'approvazione. Tuttavia, se non viene ricevuto alcun evento di questo tipo prima della timeout scadenza (72 ore), viene generato un e TaskCanceledException viene chiamata la Escalate funzione di attività.

Nota

Non è previsto alcun addebito per il tempo impiegato per l'attesa di eventi esterni durante l'esecuzione nel piano a consumo.

Un client esterno può recapitare la notifica degli eventi a una funzione dell'agente di orchestrazione in attesa usando le API HTTP predefinite:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Un evento può essere generato anche usando il client di orchestrazione durevole di un'altra funzione nella stessa app per le funzioni:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Modello 6: Aggregatore (entità con stato)

Il sesto modello riguarda l'aggregazione in una singola entità indirizzabile dei dati degli eventi relativi a un periodo di tempo. In questo modello, i dati aggregati possono provenire da più origini, possono essere recapitati in batch o possono essere sparsi per lunghi periodi di tempo. L'aggregatore potrebbe dover intervenire sui dati degli eventi man mano che arriva e i client esterni potrebbero dover eseguire query sui dati aggregati.

Aggregator diagram

Se si prova a implementare questo modello con normali funzioni senza stato, l'aspetto più difficile è che il controllo della concorrenza diventa un problema enorme. Non solo è necessario preoccuparsi di più thread che modificano contemporaneamente gli stessi dati, ma bisogna anche assicurarsi che l'aggregatore venga eseguito solo in una singola macchina virtuale alla volta.

È possibile usare entità durevoli per implementare facilmente questo modello come una singola funzione.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Le entità durevoli possono anche essere modellate come classi in .NET. Questo modello può rivelarsi utile se l'elenco delle operazioni è fisso e diventa grande. L'esempio seguente è un'implementazione equivalente dell'entità Counter tramite classi e metodi .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Nota

Le entità durevoli non sono attualmente supportate in PowerShell.

Nota

Le entità durevoli non sono attualmente supportate in Java.

I client possono accodare operazioni ("segnalazione") per una funzione di entità tramite il binding del client di entità.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Nota

In .NET sono anche disponibili proxy generati dinamicamente per la segnalazione di entità in modo indipendente dai tipi. Oltre alla segnalazione, i client possono anche eseguire query per ottenere lo stato di una funzione di entità tramite metodi indipendenti dai tipi sul binding del client di orchestrazione.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Nota

Le entità durevoli non sono attualmente supportate in PowerShell.

Nota

Le entità durevoli non sono attualmente supportate in Java.

Le funzioni di entità sono disponibili in Durable Functions 2.0 e versioni successive per C#, JavaScript e Python.

La tecnologia

L'estensione Durable Functions è basata su Durable Task Framework, una libreria open source disponibile in GitHub usata per creare flussi di lavoro nel codice. Così come Funzioni di Azure è l'evoluzione serverless di Processi Web di Azure, Durable Functions è l'evoluzione serverless di Durable Task Framework. In Microsoft e in altre organizzazioni il sistema Durable Task Framework viene ampiamente usato per automatizzare i processi cruciali. È una scelta ideale per l'ambiente senza server di Funzioni di Azure.

Vincoli di codice

Per offrire garanzie di esecuzione affidabili e a esecuzione prolungata, le funzioni dell'agente di orchestrazione prevedono un set di regole di codifica che devono essere rispettate. Per altre informazioni, vedere l'articolo sui vincoli di codice delle funzioni dell'agente di orchestrazione.

Fatturazione

La fatturazione di Durable Functions è analoga a quella di Funzioni di Azure. Per altre informazioni, vedere Prezzi di Funzioni. Quando si eseguono le funzioni dell'agente di orchestrazione nel piano a consumo di Funzioni di Azure, è necessario tenere presenti alcuni aspetti della fatturazione. Per altre informazioni, vedere l'articolo sulla fatturazione di Durable Functions.

Per iniziare immediatamente

È possibile iniziare a usare Durable Functions in meno di 10 minuti completando una di queste esercitazioni introduttive specifiche del linguaggio:

In questi argomenti di avvio rapido viene creata e testata in locale una funzione Durable "hello world". Il codice della funzione verrà quindi pubblicato in Azure. La funzione creata orchestra e concatena le chiamate ad altre funzioni.

Pubblicazioni

Durable Functions viene sviluppato in collaborazione con Microsoft Research. Di conseguenza, il team di Durable Functions produce attivamente documenti e artefatti di ricerca; tra cui:

Altre informazioni

Il video seguente illustra i vantaggi di Durable Functions:

Dal momento che Durable Functions è un'estensione avanzata per Funzioni di Azure non è adatta a tutte le applicazioni. Per un confronto con altre tecnologie di orchestrazione di Azure, vedere Confrontare Funzioni di Azure e App per la logica di Azure.

Passaggi successivi