Informazioni su Durable Functions
Durable Functions è una funzionalità di Funzioni di Azure che consente di scrivere funzioni con stato in un ambiente di calcolo serverless. L'estensione permette di definire flussi di lavoro con stato, scrivendo funzioni dell'agente di orchestrazione, ed entità con stato, scrivendo funzioni di entità tramite il modello di programmazione di Funzioni di Azure. Dietro le quinte, l'estensione gestisce automaticamente lo stato, i checkpoint e i riavvii, consentendo di concentrarsi sulla logica di business.
Lingue supportate
Durable Functions è progettato per essere usato con tutti i linguaggi di programmazione di Funzioni di Azure, benché i requisiti minimi possano essere diversi per ogni linguaggio. La tabella seguente illustra le configurazioni minime supportate per l’app:
Stack linguaggio | Versioni di runtime di Funzioni di Azure | Versione del ruolo di lavoro in base al linguaggio | Versione minima dei bundle |
---|---|---|---|
.NET / C# / F# | Funzioni 1.0+ | In-Process Out-of-process |
n/d |
JavaScript/TypeScript (modello prog V3) | Funzioni 2.0+ | Nodo 8+ | Bundle 2.x |
JavaScript/TypeScript (modello prog V4) | Funzioni 4.25+ | Nodo 18+ | Bundle 3.15+ |
Python | Funzioni 2.0+ | Python 3.7+ | Bundle 2.x |
Python (modello prog V2) | Funzioni 4.0+ | Python 3.7+ | Bundle 3.15+ |
PowerShell | Funzioni 3.0+ | PowerShell 7+ | Bundle 2.x |
Java | Funzioni 4.0+ | Java 8+ | Bundle 4.x |
Importante
Questo articolo usa schede per supportare le versioni diverse del modello di programmazione Node.js. Il modello v4 è disponibile a livello generale ed è progettato per offrire un'esperienza più flessibile e intuitiva per gli sviluppatori JavaScript e TypeScript. Per altre informazioni sul funzionamento del modello v4, vedere la guida per gli sviluppatori di Node.js per Funzioni di Azure. Altre informazioni sulle differenze tra i modelli v3 e v4 sono disponibili nella guida alla migrazione.
Importante
Questo articolo usa le schede per supportare le versioni diverse del modello di programmazione Python. Il modello v2 è disponibile a livello generale ed è progettato per offrire un modo più incentrato sul codice per la creazione di funzioni tramite elementi Decorator. Per altre informazioni sul funzionamento del modello v2, vedere la guida per gli sviluppatori di Python per Funzioni di Azure.
Come per Funzioni di Azure, sono disponibili modelli che permettono di sviluppare Durable Functions tramite Visual Studio, Visual Studio Code e il portale di Azure.
Modelli di applicazione
Durable Functions viene usato principalmente per semplificare i complessi requisiti di coordinamento con stato nelle applicazioni serverless. Le sezioni seguenti descrivono i tipici modelli di applicazione che possono trarre vantaggio da Durable Functions:
- Concatenamento di funzioni
- Fan-out/fan-in
- API HTTP asincrone
- Monitoraggio
- Interazione umana
- Aggregatore (entità con stato)
Modello 1: Concatenamento di funzioni
Nel modello di concatenamento delle funzioni una sequenza di funzioni viene eseguita in un ordine specifico. In questo modello l'output di una funzione viene applicato all'input di un'altra funzione. L'uso di code tra ogni funzione garantisce che il sistema rimanga durevole e scalabile, anche se è disponibile un flusso di controllo tra una funzione e quella successiva.
È possibile usare Durable Functions per implementare il modello di concatenamento di funzioni in modo conciso, come illustrato nell'esempio seguente.
In questo esempio, i valori F1
, F2
, F3
ed F4
sono i nomi di altre funzioni nella stessa app per le funzioni. Il flusso di controllo può essere implementato usando normali costrutti di scrittura del codice imperativa. Il codice viene eseguito dall'alto verso il basso e può implicare semantica esistente del flusso di controllo del linguaggio, ad esempio istruzioni condizionali e cicli. È possibile includere la logica di gestione degli errori nei in blocchi try
/catch
/finally
.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
È possibile usare il parametro context
per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama await
, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata await
precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan out/fan in.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
È possibile usare l'oggetto context.df
per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama yield
, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata yield
precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan out/fan in.
Nota
L'oggetto context
in JavaScript rappresenta l'intero contesto della funzione. Accedere al contesto di Durable Functions usando la proprietà df
nel contesto principale.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
È possibile usare l'oggetto context
per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama yield
, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata yield
precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan out/fan in.
Nota
L'oggetto context
in Python rappresenta il contesto dell'orchestrazione. Accedere al contesto principale di Funzioni di Azure usando la proprietà function_context
nel contesto di orchestrazione.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
È possibile usare il comando Invoke-DurableActivity
per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama Invoke-DurableActivity
senza l'opzione NoWait
, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata Invoke-DurableActivity
precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan out/fan in.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
È possibile usare l'oggetto ctx
per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. L'output di questi metodi è un oggetto Task<V>
in cui V
è il tipo di dati restituiti dalla funzione richiamata. Ogni volta che il codice chiama Task<V>.await()
, il framework di Durable Functions controlla lo stato di avanzamento dell'istanza della funzione corrente tramite checkpoint. Se il processo viene riciclato in modo imprevisto a metà dell'esecuzione, l'istanza della funzione riprende dalla chiamata Task<V>.await()
precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan out/fan in.
Modello 2: Fan-out/fan-in
Nel modello Fan-out/fan-in vengono eseguite più funzioni in parallelo e quindi viene atteso il completamento di tutte le funzioni. Alcune operazioni di aggregazione vengono spesso eseguite sui risultati restituiti dalle funzioni.
Con le funzioni normali è possibile eseguire il fan-out facendo in modo che la funzione invii più messaggi a una coda. L'operazione di fan-in è molto più complessa. In una normale funzione si scrive codice per rilevare il completamento delle funzioni attivate dalla coda e quindi archiviarne l'output.
L'estensione Durable Functions gestisce questo modello con codice relativamente semplice:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
L'operazione di fan-out viene distribuita a più istanze della funzione F2
e viene monitorata tramite un elenco dinamico di attività. Viene effettuata una chiamata a Task.WhenAll
per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2
vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3
.
L'impostazione automatica di checkpoint che avviene alla chiamata di await
su Task.WhenAll
assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
L'operazione di fan-out viene distribuita a più istanze della funzione F2
e viene monitorata tramite un elenco dinamico di attività. Viene chiamata l'API context.df.Task.all
per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2
vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3
.
L'impostazione automatica di checkpoint che avviene alla chiamata di yield
su context.df.Task.all
assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
L'operazione di fan-out viene distribuita a più istanze della funzione F2
e viene monitorata tramite un elenco dinamico di attività. Viene chiamata l'API context.task_all
per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2
vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3
.
L'impostazione automatica di checkpoint che avviene alla chiamata di yield
su context.task_all
assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
L'operazione di fan-out viene distribuita a più istanze della funzione F2
Si noti l'utilizzo dell'opzione NoWait
per la chiamata della funzione F2
: questa opzione consente all'agente di orchestrazione di continuare a richiamare F2
senza attendere il completamento dell'attività. e viene monitorata tramite un elenco dinamico di attività. Il comando Wait-ActivityFunction
viene chiamato per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2
vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3
.
L'impostazione automatica di checkpoint che avviene alla chiamata di Wait-ActivityFunction
assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
L'operazione di fan-out viene distribuita a più istanze della funzione F2
e viene monitorata tramite un elenco dinamico di attività. Viene effettuata una chiamata a ctx.allOf(parallelTasks).await()
per attendere il completamento di tutte le funzioni chiamate. Gli output della funzione F2
vengono quindi aggregati dall'elenco di attività dinamici e restituiti come output della funzione dell'agente di orchestrazione.
Il controllo tramite checkpoint automatico che si verifica al momento della chiamata .await()
a ctx.allOf(parallelTasks)
garantisce che un riciclo imprevisto del processo non richieda il riavvio delle attività già completate.
Nota
In rari casi, è possibile che si verifichi un arresto anomalo nella finestra dopo che una funzione di attività è stata completata, ma prima che il completamento venga salvato nella cronologia dell'orchestrazione. In tal caso, la funzione di attività verrebbe rieseguita dall'inizio dopo il ripristino del processo.
Modello 3: API HTTP asincrone
Il modello API HTTP asincrone risolve il problema di coordinare lo stato di operazioni a esecuzione prolungata con client esterni. Un modo comune per implementare questo modello è fare in modo che un endpoint HTTP attivi l'azione a esecuzione prolungata. Reindirizzare quindi il client a un endpoint di stato di cui il client esegue il polling per sapere quando termina l'operazione.
Durable Functions offre supporto incorporato per questo modello, semplificando o anche rimuovendo il codice che è necessario scrivere per interagire con le funzioni a esecuzione prolungata. Gli esempi riportati nella guida introduttiva di Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell e Java) mostrano un semplice comando REST che è possibile usare per avviare nuove istanze delle funzioni dell’agente di orchestrazione. Dopo l'avvio di un'istanza, l'estensione espone le API HTTP del webhook che eseguono query sullo stato della funzione di orchestrazione.
L'esempio seguente mostra i comandi REST per avviare un agente di orchestrazione e per eseguire query sul relativo stato. Per maggiore chiarezza, alcuni dettagli sono stati omessi dall'esempio.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Poiché lo stato è gestito automaticamente dal runtime di Durable Functions, non è necessario implementare un meccanismo personalizzato di monitoraggio.
L'estensione Durable Functions espone le API HTTP predefinite che gestiscono le orchestrazioni a esecuzione prolungata. In alternativa, è possibile implementare questo modello direttamente usando trigger di funzione personalizzati (ad esempio HTTP, una coda o Hub eventi di Azure) e il binding del client durevole. Ad esempio, è possibile usare un messaggio di coda per attivare la terminazione. In alternativa, è possibile usare un trigger HTTP protetto da un criterio di autenticazione di Microsoft Entra anziché le API HTTP predefinite che usano una chiave generata per l'autenticazione.
Per altre informazioni, vedere l'articolo sulle funzionalità HTTP, che spiega come esporre i processi asincroni a esecuzione prolungata su HTTP usando l'estensione Durable Functions.
Modello 4: Monitoraggio
Il modello Monitoraggio si riferisce a un processo flessibile e ricorrente in un flusso di lavoro. Un esempio è il polling fino a quando non vengono soddisfatte condizioni specifiche. È possibile usare un normale trigger timer per gestire un semplice scenario, ad esempio un processo di pulizia periodico, ma l'intervallo è statico e la gestione delle durate delle istanze diventa complessa. È possibile usare Durable Functions per creare intervalli di ricorrenza flessibili, gestire la durata delle attività e creare più processi di monitoraggio da una singola orchestrazione.
Un esempio di modello di monitoraggio consiste nell'invertire lo scenario precedente delle API HTTP asincrone. Invece di esporre un endpoint per un client esterno per monitorare un'operazione a esecuzione prolungata, il monitoraggio dell'esecuzione prolungata utilizza un endpoint esterno, in attesa di un cambiamento di stato.
In poche righe di codice, è possibile usare Durable Functions per creare più monitoraggi che osservano endpoint arbitrari. I monitoraggi possono terminare l'esecuzione quando viene soddisfatta una condizione oppure un'altra funzione può usare il client di orchestrazione durevole per terminare i monitoraggi. È possibile cambiare l'intervallo wait
di un monitoraggio in base a una condizione specifica, ad esempio un backoff esponenziale.
Il codice seguente implementa un monitoraggio di base:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Quando viene ricevuta una richiesta, viene creata una nuova istanza di orchestrazione per tale ID processo. L'istanza esegue il polling di uno stato fino a quando non viene soddisfatta una condizione o fino alla scadenza di un timeout. Un timer di Durable Functions controlla l'intervallo di polling. È quindi possibile eseguire altre operazioni o terminare l'orchestrazione.
Modello 5: Interazione umana
Molti processi automatizzati implicano una qualche interazione umana. Il problema in un processo automatizzato con interazione umana è che le persone non sono sempre disponibili e reattive quanto i servizi cloud. Un processo automatizzato può tener conto di questa interazione usando i timeout e la logica di compensazione.
Un processo di approvazione è un esempio di processo aziendale che implica interazione umana. Potrebbe essere richiesta l'approvazione di un manager per una nota spese che supera un determinato importo. Se il responsabile non approva la nota spese entro 72 ore (ad esempio perché è in ferie), viene avviato un processo di escalation per ottenere l'approvazione da parte di qualcun altro, magari il suo diretto superiore.
È possibile implementare il modello in questo esempio usando una funzione di orchestrazione. L'agente di orchestrazione usa un timer durevole per richiedere l'approvazione. in caso di timeout, esegue l'escalation. L'agente di orchestrazione attende che si verifichi un evento esterno, ad esempio una notifica generata tramite interazione umana.
Questi esempi creano un processo di approvazione per illustrare il modello di interazione umana:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Per creare il timer durevole, chiamare context.CreateTimer
. La notifica viene ricevuta da context.WaitForExternalEvent
. Viene quindi effettuata una chiamata a Task.WhenAny
per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Per creare il timer durevole, chiamare context.df.createTimer
. La notifica viene ricevuta da context.df.waitForExternalEvent
. Viene quindi effettuata una chiamata a context.df.Task.any
per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Per creare il timer durevole, chiamare context.create_timer
. La notifica viene ricevuta da context.wait_for_external_event
. Viene quindi effettuata una chiamata a context.task_any
per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Per creare il timer durevole, chiamare Start-DurableTimer
. La notifica viene ricevuta da Start-DurableExternalEventListener
. Viene quindi effettuata una chiamata a Wait-DurableTask
per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
La chiamata al metodo ctx.waitForExternalEvent(...).await()
sospende l'orchestrazione fino a quando non riceve un evento denominato ApprovalEvent
, che ha un payload boolean
. Se l'evento viene ricevuto, viene chiamata una funzione dell’attività per elaborare il risultato dell'approvazione. Tuttavia, se non viene ricevuto alcun evento di questo tipo prima della scadenza timeout
(72 ore), viene generato un TaskCanceledException
e viene chiamata la funzione dell’attività Escalate
.
Nota
Non è previsto alcun addebito per il tempo impiegato per l'attesa di eventi esterni durante l'esecuzione nel piano a consumo.
Un client esterno può recapitare la notifica degli eventi a una funzione dell'agente di orchestrazione in attesa usando le API HTTP predefinite:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
Un evento può essere generato anche usando il client di orchestrazione durevole di un'altra funzione nella stessa app per le funzioni:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");
module.exports = async function (context) {
const client = df.getClient(context);
const isApproved = true;
await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Modello 6: Aggregatore (entità con stato)
Il sesto modello riguarda l'aggregazione in una singola entità indirizzabile dei dati degli eventi relativi a un periodo di tempo. In questo modello i dati aggregati possono provenire da più origini, possono essere recapitati in batch o possono essere distribuiti nel tempo. È possibile che l'aggregatore debba intervenire sui dati degli eventi non appena arrivano ed eseguire query sui dati aggregati per i client esterni.
Se si prova a implementare questo modello con normali funzioni senza stato, l'aspetto più difficile è che il controllo della concorrenza diventa un problema enorme. Non solo è necessario preoccuparsi di più thread che modificano contemporaneamente gli stessi dati, ma bisogna anche assicurarsi che l'aggregatore venga eseguito solo in una singola macchina virtuale alla volta.
È possibile usare entità durevoli per implementare facilmente questo modello come una singola funzione.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Le entità durevoli possono anche essere modellate come classi in .NET. Questo modello può rivelarsi utile se l'elenco delle operazioni è fisso e diventa grande. L'esempio seguente è un'implementazione equivalente dell'entità Counter
tramite classi e metodi .NET.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Nota
Le entità durevoli non sono attualmente supportate in PowerShell.
Nota
Le entità durevoli non sono attualmente supportate in Java.
I client possono accodare operazioni ("segnalazione") per una funzione di entità tramite il binding del client di entità.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Nota
In .NET sono anche disponibili proxy generati dinamicamente per la segnalazione di entità in modo indipendente dai tipi. Oltre alla segnalazione, i client possono anche eseguire query per ottenere lo stato di una funzione di entità tramite metodi indipendenti dai tipi sul binding del client di orchestrazione.
const df = require("durable-functions");
const { app } = require("@azure/functions");
module.exports = async function (context) {
const client = df.getClient(context);
const entityId = new df.EntityId("Counter", "myCounter");
await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Le funzioni dell’entità sono disponibili in Durable Functions 2.0 e versioni successive per C#, JavaScript e Python.
La tecnologia
L'estensione Durable Functions è basata su Durable Task Framework, una libreria open source disponibile in GitHub usata per creare flussi di lavoro nel codice. Così come Funzioni di Azure è l'evoluzione serverless di Processi Web di Azure, Durable Functions è l'evoluzione serverless di Durable Task Framework. In Microsoft e in altre organizzazioni il sistema Durable Task Framework viene ampiamente usato per automatizzare i processi cruciali. È una scelta ideale per l'ambiente senza server di Funzioni di Azure.
Vincoli di codice
Per offrire garanzie di esecuzione affidabili e a esecuzione prolungata, le funzioni dell'agente di orchestrazione prevedono un set di regole di codifica che devono essere rispettate. Per altre informazioni, vedere l'articolo sui vincoli di codice delle funzioni dell'agente di orchestrazione.
Fatturazione
La fatturazione di Durable Functions è analoga a quella di Funzioni di Azure. Per altre informazioni, vedere Prezzi di Funzioni. Quando si eseguono le funzioni dell'agente di orchestrazione nel piano a consumo di Funzioni di Azure, è necessario tenere presenti alcuni aspetti della fatturazione. Per altre informazioni, vedere l'articolo sulla fatturazione di Durable Functions.
Per iniziare immediatamente
È possibile iniziare a usare Durable Functions in meno di 10 minuti completando una di queste esercitazioni introduttive specifiche del linguaggio:
- C# con Visual Studio 2019
- JavaScript con Visual Studio Code
- TypeScript using Visual Studio Code
- Python con Visual Studio Code
- PowerShell con Visual Studio Code
- Java con Maven
In questi argomenti di avvio rapido viene creata e testata in locale una funzione Durable "hello world". Il codice della funzione verrà quindi pubblicato in Azure. La funzione creata orchestra e concatena le chiamate ad altre funzioni.
Pubblicazioni
Durable Functions è sviluppato in collaborazione con Microsoft Research. Di conseguenza, il team di Durable Functions produce attivamente documenti di ricerca e artefatti; tra cui:
- Durable Functions: Semantica per serverless con stato(OOPSLA'21)
- Flussi di lavoro serverless con Durable Functions e Netherite(pre-stampa)
Altre informazioni
Il video seguente illustra i vantaggi di Durable Functions:
Dal momento che Durable Functions è un'estensione avanzata per Funzioni di Azure non è adatta a tutte le applicazioni. Per un confronto con altre tecnologie di orchestrazione di Azure, vedere Confrontare Funzioni di Azure e App per la logica di Azure.