Vad Durable Functions?
Durable Functions är en funktion i Azure Functions som gör att du kan skriva tillståndskänsliga funktioner i en serverlös beräkningsmiljö. Med tillägget kan du definiera tillståndskänsliga arbetsflöden genom att skriva orkestreringsfunktioner och tillståndskänsliga entiteter genom att skriva entitetsfunktioner med hjälp av Azure Functions-programmeringsmodellen. I bakgrunden hanterar tillägget tillstånd, kontrollpunkter och omstarter åt dig, så att du kan fokusera på din affärslogik.
Språk som stöds
Durable Functions är utformat för att fungera med alla Azure Functions-programmeringsspråk men kan ha olika minimikrav för varje språk. I följande tabell visas de minsta appkonfigurationer som stöds:
Språkstacken | Azure Functions Runtime-versioner | Språkarbetarens version | Lägsta paketversion |
---|---|---|---|
.NET/C# /F# | Funktioner 1.0+ | Pågående Out-of-process |
saknas |
JavaScript/TypeScript (V3 prog. model) | Functions 2.0+ | Nod 8+ | 2.x-paket |
JavaScript/TypeScript (V4 prog. model) | Functions 4.25+ | Nod 18+ | 3,15+ paket |
Python | Functions 2.0+ | Python 3.7+ | 2.x-paket |
Python (V2-modell) | Functions 4.0+ | Python 3.7+ | 3,15+ paket |
PowerShell | Functions 3.0+ | PowerShell 7+ | 2.x-paket |
Java | Functions 4.0+ | Java 8+ | 4.x-paket |
Viktigt!
Den här artikeln använder flikar för att stödja flera versioner av Node.js programmeringsmodellen. V4-modellen är allmänt tillgänglig och är utformad för att ha en mer flexibel och intuitiv upplevelse för JavaScript- och TypeScript-utvecklare. Mer information om hur v4-modellen fungerar finns i utvecklarguiden för Azure Functions Node.js. Mer information om skillnaderna mellan v3 och v4 finns i migreringsguiden.
Viktigt!
Den här artikeln använder flikar för att stödja flera versioner av Python-programmeringsmodellen. V2-modellen är allmänt tillgänglig och är utformad för att ge ett mer kodcentrerat sätt att redigera funktioner via dekoratörer. Mer information om hur v2-modellen fungerar finns i utvecklarhandboken för Azure Functions Python.
Precis som i Azure Functions finns det mallar som hjälper dig att utveckla Durable Functions med hjälp av Visual Studio, Visual Studio Code och Azure Portal.
Programmönster
Det huvudsakliga användningsfallet för Durable Functions är att förenkla komplexa, tillståndskänsliga koordinationskrav i serverlösa program. I följande avsnitt beskrivs vanliga programmönster som kan dra nytta av Durable Functions:
- Funktionslänkning
- Förgrening
- Asynkrona HTTP-API:er
- Övervakning
- Mänsklig interaktion
- Aggregator (tillståndskänsliga entiteter)
Mönster nr 1: Funktionslänkning
I funktionslänkningsmönstret körs en sekvens med funktioner i en specifik ordning. I det här mönstret tillämpas utdata från en funktion på indata för en annan funktion. Användningen av köer mellan varje funktion säkerställer att systemet förblir beständigt och skalbart, även om det finns ett flöde av kontroll från en funktion till en annan.
Du kan använda Durable Functions för att implementera funktionslänkningsmönstret kortfattat enligt följande exempel.
I det här exemplet är värdena F1
, F2
, F3
och F4
namnen på andra funktioner i samma funktionsapp. Du kan implementera kontrollflöde med hjälp av normala imperativa kodningskonstruktioner. Koden körs uppifrån och ned. Koden kan omfatta befintlig språkkontrollflödessemantik, till exempel villkor och loopar. Du kan inkludera felhanteringslogik i try
//catch
finally
block.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Du kan använda parametern context
för att anropa andra funktioner efter namn, skicka parametrar och returnera funktionsutdata. Varje gång koden anropar await
, styr Durable Functions-ramverket för förloppet för den aktuella funktionsinstansen. Om processen eller den virtuella datorn återanvänds halvvägs genom körningen återupptas funktionsinstansen från föregående await
anrop. Mer information finns i nästa avsnitt, Mönster #2: Fläkta ut/fläkta in.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
Du kan använda context.df
-objektet för att anropa andra funktioner efter namn, skicka parametrar och returnera funktionsutdata. Varje gång koden anropar yield
, styr Durable Functions-ramverket för förloppet för den aktuella funktionsinstansen. Om processen eller den virtuella datorn återanvänds halvvägs genom körningen återupptas funktionsinstansen från föregående yield
anrop. Mer information finns i nästa avsnitt, Mönster #2: Fläkta ut/fläkta in.
Kommentar
Objektet context
i JavaScript representerar hela funktionskontexten. Få åtkomst till Durable Functions-kontexten df
med hjälp av egenskapen i huvudkontexten.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Du kan använda context
-objektet för att anropa andra funktioner efter namn, skicka parametrar och returnera funktionsutdata. Varje gång koden anropar yield
, styr Durable Functions-ramverket för förloppet för den aktuella funktionsinstansen. Om processen eller den virtuella datorn återanvänds halvvägs genom körningen återupptas funktionsinstansen från föregående yield
anrop. Mer information finns i nästa avsnitt, Mönster #2: Fläkta ut/fläkta in.
Kommentar
Objektet context
i Python representerar orkestreringskontexten. Få åtkomst till azure functions-huvudkontexten function_context
med hjälp av egenskapen i orkestreringskontexten.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
Du kan använda Invoke-DurableActivity
kommandot för att anropa andra funktioner efter namn, skicka parametrar och returnera funktionsutdata. Varje gång koden anropar Invoke-DurableActivity
utan växeln NoWait
, styr Durable Functions-ramverket för förloppet för den aktuella funktionsinstansen. Om processen eller den virtuella datorn återanvänds halvvägs genom körningen återupptas funktionsinstansen från föregående Invoke-DurableActivity
anrop. Mer information finns i nästa avsnitt, Mönster #2: Fläkta ut/fläkta in.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Du kan använda ctx
-objektet för att anropa andra funktioner efter namn, skicka parametrar och returnera funktionsutdata. Utdata från dessa metoder är ett Task<V>
objekt där är den typ av data som V
returneras av den anropade funktionen. Varje gång du anropar Task<V>.await()
styr Durable Functions-ramverket för förloppet för den aktuella funktionsinstansen. Om processen oväntat återanvänds halvvägs genom körningen återupptas funktionsinstansen från föregående Task<V>.await()
anrop. Mer information finns i nästa avsnitt, Mönster #2: Fläkta ut/fläkta in.
Mönster nr 2: Fläkta ut/fläkta in
I fläktens ut-/fläkt-mönster kör du flera funktioner parallellt och väntar sedan på att alla funktioner ska slutföras. Ofta utförs en del aggregeringsarbete på de resultat som returneras från funktionerna.
Med normala funktioner kan du fläkta ut genom att funktionen skickar flera meddelanden till en kö. Fanning tillbaka i är mycket mer utmanande. Om du vill använda en normal funktion skriver du kod för att spåra när de köutlösta funktionerna slutar och lagrar sedan funktionsutdata.
Durable Functions-tillägget hanterar det här mönstret med relativt enkel kod:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Utloggningsarbetet distribueras till flera instanser av F2
funktionen. Arbetet spåras med hjälp av en dynamisk lista över uppgifter. Task.WhenAll
anropas för att vänta tills alla anropade funktioner har slutförts. F2
Sedan aggregeras funktionsutdata från den dynamiska uppgiftslistan och skickas F3
till funktionen.
Den automatiska kontrollpunkter som sker vid anropet await
på Task.WhenAll
säkerställer att en potentiell mellankrasch eller omstart inte kräver omstart av en aktivitet som redan har slutförts.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Utloggningsarbetet distribueras till flera instanser av F2
funktionen. Arbetet spåras med hjälp av en dynamisk lista över uppgifter. context.df.Task.all
API:et anropas för att vänta tills alla anropade funktioner har slutförts. F2
Sedan aggregeras funktionsutdata från den dynamiska uppgiftslistan och skickas F3
till funktionen.
Den automatiska kontrollpunkter som sker vid anropet yield
på context.df.Task.all
säkerställer att en potentiell mellankrasch eller omstart inte kräver omstart av en aktivitet som redan har slutförts.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
Utloggningsarbetet distribueras till flera instanser av F2
funktionen. Arbetet spåras med hjälp av en dynamisk lista över uppgifter. context.task_all
API:et anropas för att vänta tills alla anropade funktioner har slutförts. F2
Sedan aggregeras funktionsutdata från den dynamiska uppgiftslistan och skickas F3
till funktionen.
Den automatiska kontrollpunkter som sker vid anropet yield
på context.task_all
säkerställer att en potentiell mellankrasch eller omstart inte kräver omstart av en aktivitet som redan har slutförts.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
Utloggningsarbetet distribueras till flera instanser av F2
funktionen. Observera användningen av växeln NoWait
på funktionsanropet F2
: Med den här växeln kan orchestrator fortsätta anropa F2
utan att vänta på att aktiviteten ska slutföras. Arbetet spåras med hjälp av en dynamisk lista över uppgifter. Kommandot Wait-ActivityFunction
anropas för att vänta tills alla anropade funktioner har slutförts. F2
Sedan aggregeras funktionsutdata från den dynamiska uppgiftslistan och skickas F3
till funktionen.
Den automatiska kontrollpunkter som sker vid anropet Wait-ActivityFunction
säkerställer att en potentiell mellankrasch eller omstart inte kräver omstart av en aktivitet som redan har slutförts.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
Utloggningsarbetet distribueras till flera instanser av F2
funktionen. Arbetet spåras med hjälp av en dynamisk lista över uppgifter. ctx.allOf(parallelTasks).await()
anropas för att vänta tills alla anropade funktioner har slutförts. F2
Sedan aggregeras funktionsutdata från den dynamiska uppgiftslistan och returneras som orchestrator-funktionens utdata.
Den automatiska kontrollpunkter som sker vid anropet .await()
på ctx.allOf(parallelTasks)
säkerställer att en oväntad processåtervinna inte kräver omstart av redan slutförda uppgifter.
Kommentar
I sällsynta fall är det möjligt att en krasch kan inträffa i fönstret när en aktivitetsfunktion har slutförts, men innan den har slutförts sparas den i orkestreringshistoriken. Om detta händer körs aktivitetsfunktionen igen från början när processen har återställts.
Mönster nr 3: Asynkrona HTTP-API:er
Det asynkrona HTTP API-mönstret löser problemet med att samordna tillståndet för långvariga åtgärder med externa klienter. Ett vanligt sätt att implementera det här mönstret är att ha en HTTP-slutpunkt som utlöser den långvariga åtgärden. Omdirigera sedan klienten till en statusslutpunkt som klienten avsöker för att lära sig när åtgärden är klar.
Durable Functions har inbyggt stöd för det här mönstret, vilket förenklar eller till och med tar bort den kod som du behöver skriva för att interagera med långvariga funktionskörningar. Snabbstartsexempel för Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell och Java) visar till exempel ett enkelt REST-kommando som du kan använda för att starta nya orchestrator-funktionsinstanser. När en instans har startats exponerar tillägget http-API:er för webhook som kör frågor mot orkestreringsfunktionens status.
I följande exempel visas REST-kommandon som startar en orkestrerare och frågar efter dess status. För tydlighetens skull utelämnas vissa protokollinformation från exemplet.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Eftersom Durable Functions-körningen hanterar tillståndet åt dig behöver du inte implementera din egen mekanism för statusspårning.
Durable Functions-tillägget exponerar inbyggda HTTP-API:er som hanterar långvariga orkestreringar. Du kan också implementera det här mönstret själv med hjälp av dina egna funktionsutlösare (till exempel HTTP, en kö eller Azure Event Hubs) och den hållbara klientbindningen. Du kan till exempel använda ett kömeddelande för att utlösa avslutning. Eller så kan du använda en HTTP-utlösare som skyddas av en Microsoft Entra-autentiseringsprincip i stället för de inbyggda HTTP-API:er som använder en genererad nyckel för autentisering.
Mer information finns i artikeln OM HTTP-funktioner , som förklarar hur du kan exponera asynkrona, långvariga processer via HTTP med tillägget Durable Functions.
Mönster nr 4: Övervaka
Övervakningsmönstret refererar till en flexibel, återkommande process i ett arbetsflöde. Ett exempel är avsökning tills specifika villkor uppfylls. Du kan använda en vanlig timerutlösare för att hantera ett grundläggande scenario, till exempel ett periodiskt rensningsjobb, men dess intervall är statiskt och hanteringen av instansens livslängd blir komplex. Du kan använda Durable Functions för att skapa flexibla upprepningsintervall, hantera aktivitetslivslängder och skapa flera övervakningsprocesser från en enda orkestrering.
Ett exempel på övervakningsmönstret är att vända det tidigare asynkrona HTTP API-scenariot. I stället för att exponera en slutpunkt för en extern klient för att övervaka en långvarig åtgärd förbrukar den långvariga övervakaren en extern slutpunkt och väntar sedan på en tillståndsändring.
I några rader med kod kan du använda Durable Functions för att skapa flera övervakare som observerar godtyckliga slutpunkter. Övervakarna kan avsluta körningen när ett villkor uppfylls, eller så kan en annan funktion använda den varaktiga orkestreringsklienten för att avsluta övervakarna. Du kan ändra övervakarens wait
intervall baserat på ett specifikt villkor (till exempel exponentiell backoff.)
Följande kod implementerar en grundläggande övervakare:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
När en begäran tas emot skapas en ny orkestreringsinstans för det jobb-ID:t. Instansen avsöker en status tills antingen ett villkor uppfylls eller tills en tidsgräns upphör att gälla. En varaktig timer styr avsökningsintervallet. Sedan kan mer arbete utföras, eller så kan orkestreringen avslutas.
Mönster nr 5: Mänsklig interaktion
Många automatiserade processer omfattar någon form av mänsklig interaktion. Det är svårt att involvera människor i en automatiserad process eftersom människor inte är lika tillgängliga och lika dynamiska som molntjänster. En automatiserad process kan möjliggöra den här interaktionen med hjälp av timeouter och kompensationslogik.
En godkännandeprocess är ett exempel på en affärsprocess som inbegriper mänsklig interaktion. Godkännande från en chef kan krävas för en utgiftsrapport som överskrider ett visst dollarbelopp. Om chefen inte godkänner utgiftsrapporten inom 72 timmar (kan vara att chefen åkte på semester) startar en eskaleringsprocess för att få godkännande från någon annan (kanske chefens chef).
Du kan implementera mönstret i det här exemplet med hjälp av en orchestrator-funktion. Orchestrator använder en varaktig timer för att begära godkännande. Orkestratorn eskalerar om tidsgränsen uppnås. Orkestratorn väntar på en extern händelse, till exempel ett meddelande som genereras av en mänsklig interaktion.
De här exemplen skapar en godkännandeprocess för att demonstrera det mänskliga interaktionsmönstret:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Om du vill skapa den varaktiga timern anropar du context.CreateTimer
. Meddelandet tas emot av context.WaitForExternalEvent
. Task.WhenAny
Anropas sedan för att avgöra om tidsgränsen ska eskalera (tidsgränsen inträffar först) eller bearbeta godkännandet (godkännandet tas emot före tidsgränsen).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Om du vill skapa den varaktiga timern anropar du context.df.createTimer
. Meddelandet tas emot av context.df.waitForExternalEvent
. context.df.Task.any
Anropas sedan för att avgöra om tidsgränsen ska eskalera (tidsgränsen inträffar först) eller bearbeta godkännandet (godkännandet tas emot före tidsgränsen).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Om du vill skapa den varaktiga timern anropar du context.create_timer
. Meddelandet tas emot av context.wait_for_external_event
. context.task_any
Anropas sedan för att avgöra om tidsgränsen ska eskalera (tidsgränsen inträffar först) eller bearbeta godkännandet (godkännandet tas emot före tidsgränsen).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Om du vill skapa den varaktiga timern anropar du Start-DurableTimer
. Meddelandet tas emot av Start-DurableExternalEventListener
. Wait-DurableTask
Anropas sedan för att avgöra om tidsgränsen ska eskalera (tidsgränsen inträffar först) eller bearbeta godkännandet (godkännandet tas emot före tidsgränsen).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
Metodanropet ctx.waitForExternalEvent(...).await()
pausar orkestreringen tills den tar emot en händelse med namnet ApprovalEvent
, som har en boolean
nyttolast. Om händelsen tas emot anropas en aktivitetsfunktion för att bearbeta godkännanderesultatet. Men om ingen sådan händelse tas emot innan timeout
(72 timmar) upphör att gälla, aktiveras en TaskCanceledException
och aktivitetsfunktionen Escalate
anropas.
Kommentar
Det kostar inget att vänta på externa händelser när de körs i förbrukningsplanen.
En extern klient kan leverera händelsemeddelandet till en väntande orkestreringsfunktion med hjälp av de inbyggda HTTP-API:erna:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
En händelse kan också aktiveras med hjälp av den varaktiga orkestreringsklienten från en annan funktion i samma funktionsapp:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Mönster nr 6: Aggregator (tillståndskänsliga entiteter)
Det sjätte mönstret handlar om att aggregera händelsedata under en viss tidsperiod till en enda, adresserbar entitet. I det här mönstret kan de data som aggregeras komma från flera källor, levereras i batchar eller vara spridda över långa tidsperioder. Aggregatorn kan behöva vidta åtgärder för händelsedata när de tas emot, och externa klienter kan behöva köra frågor mot aggregerade data.
Det knepiga med att försöka implementera det här mönstret med normala, tillståndslösa funktioner är att samtidighetskontroll blir en enorm utmaning. Du behöver inte bara oroa dig för att flera trådar ändrar samma data samtidigt, du behöver också oroa dig för att se till att aggregatorn bara körs på en enda virtuell dator i taget.
Du kan använda Durable-entiteter för att enkelt implementera det här mönstret som en enda funktion.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Varaktiga entiteter kan också modelleras som klasser i .NET. Den här modellen kan vara användbar om listan över åtgärder är fast och blir stor. Följande exempel är en motsvarande implementering av entiteten Counter
med hjälp av .NET-klasser och -metoder.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Kommentar
Varaktiga entiteter stöds för närvarande inte i PowerShell.
Kommentar
Varaktiga entiteter stöds för närvarande inte i Java.
Klienter kan ange åtgärder för (kallas även "signalering") för en entitetsfunktion med hjälp av entitetsklientbindningen.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Kommentar
Dynamiskt genererade proxyservrar är också tillgängliga i .NET för signalering av entiteter på ett typsäkert sätt. Förutom signalering kan klienter också fråga efter tillståndet för en entitetsfunktion med hjälp av typsäkra metoder för orkestreringsklientbindningen.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Entitetsfunktioner är tillgängliga i Durable Functions 2.0 och senare för C#, JavaScript och Python.
Tekniken
I bakgrunden bygger Durable Functions-tillägget ovanpå Durable Task Framework, ett bibliotek med öppen källkod på GitHub som används för att skapa arbetsflöden i kod. Precis som Azure Functions är den serverlösa utvecklingen av Azure WebJobs är Durable Functions den serverlösa utvecklingen av Durable Task Framework. Microsoft och andra organisationer använder Durable Task Framework i stor utsträckning för att automatisera verksamhetskritiska processer. Det passar naturligt för den serverlösa Azure Functions-miljön.
Kodbegränsningar
För att tillhandahålla tillförlitliga och långvariga körningsgarantier har orkestreringsfunktioner en uppsättning kodningsregler som måste följas. Mer information finns i artikeln om begränsningar för Orchestrator-funktionskod .
Fakturering
Durable Functions debiteras samma som Azure Functions. Mer information finns i prissättning för Azure Functions. När du kör orchestrator-funktioner i Azure Functions-förbrukningsplanen finns det vissa faktureringsbeteenden att känna till. Mer information om dessa beteenden finns i artikeln Durable Functions-fakturering .
Kom igång snabbt
Du kan komma igång med Durable Functions på mindre än 10 minuter genom att slutföra någon av dessa språkspecifika snabbstartsguider:
- C# med Visual Studio 2019
- JavaScript med hjälp av Visual Studio Code
- TypeScript med Visual Studio Code
- Python med Visual Studio Code
- PowerShell med Visual Studio Code
- Java med Maven
I de här snabbstarterna skapar och testar du lokalt en "hello world"-beständig funktion. Du publicerar sedan funktionskoden till Azure. Den funktion som du skapar orkestrerar och kedjar samman anrop till andra funktioner.
Publikationer
Durable Functions har utvecklats i samarbete med Microsoft Research. Som ett resultat producerar Durable Functions-teamet aktivt forskningsdokument och artefakter; dessa inkluderar:
- Durable Functions: Semantik för tillståndskänslig serverlös (OOPSLA'21)
- Serverlösa arbetsflöden med Durable Functions och Netherite (förutskrift)
Läs mer
Följande video visar fördelarna med Durable Functions:
Eftersom Durable Functions är ett avancerat tillägg för Azure Functions är det inte lämpligt för alla program. En jämförelse med andra Azure-orkestreringstekniker finns i avsnittet om att jämföra Azure Functions med Azure Logic Apps.