Wat zijn Durable Functions?
Durable Functions is een functie van Azure Functions waarmee u stateful functies kunt schrijven in een serverloze rekenomgeving. Met de extensie kunt u stateful werkstromen definiëren door Orchestrator-functies te schrijven en stateful entiteiten door entiteitsfuncties te schrijven met behulp van het Azure Functions-programmeermodel. Achter de schermen beheert de extensie status, controlepunten en het opnieuw opstarten, zodat u zich kunt concentreren op uw bedrijf.
Ondersteunde talen
Durable Functions is ontworpen om te werken met alle Azure Functions-programmeertalen, maar heeft mogelijk verschillende minimale vereisten voor elke taal. In de volgende tabel ziet u de minimaal ondersteunde app-configuraties:
Taalstack | Azure Functions Runtime-versies | Versie van taalwerkrol | Minimale bundelversie |
---|---|---|---|
.NET / C# / F# | Functions 1.0+ | In-proces Out-of-process |
n.v.t. |
JavaScript/TypeScript (V3 prog. model) | Functions 2.0+ | Node 8+ | 2.x bundels |
JavaScript/TypeScript (V4 prog. model) | Functions 4.25+ | Node 18+ | 3.15+ bundels |
Python | Functions 2.0+ | Python 3.7+ | 2.x bundels |
Python (V2 prog. model) | Functions 4.0+ | Python 3.7+ | 3.15+ bundels |
Powershell | Functions 3.0+ | PowerShell 7+ | 2.x bundels |
Java | Functions 4.0+ | Java 8+ | 4.x bundels |
Belangrijk
In dit artikel worden tabbladen gebruikt ter ondersteuning van meerdere versies van het Node.js programmeermodel. Het v4-model is algemeen beschikbaar en is ontworpen voor een flexibelere en intuïtievere ervaring voor JavaScript- en TypeScript-ontwikkelaars. Raadpleeg de ontwikkelaarshandleiding voor Azure Functions Node.js voor meer informatie over hoe het v4-model werkt. Raadpleeg de migratiehandleiding voor meer informatie over de verschillen tussen v3 en v4.
Belangrijk
In dit artikel worden tabbladen gebruikt ter ondersteuning van meerdere versies van het Python-programmeermodel. Het v2-model is algemeen beschikbaar en is ontworpen om een meer codegerichte manier te bieden voor het ontwerpen van functies via decorators. Raadpleeg de Ontwikkelaarshandleiding voor Azure Functions Python voor meer informatie over hoe het v2-model werkt.
Net als Azure Functions zijn er sjablonen voor het ontwikkelen van Durable Functions met behulp van Visual Studio, Visual Studio Code en Azure Portal.
Toepassingspatronen
De primaire use case voor Durable Functions is het vereenvoudigen van complexe stateful coördinatievereisten in serverloze toepassingen. In de volgende secties worden enkele typische toepassingspatronen beschreven die kunnen profiteren van Durable Functions:
- Functiekoppeling
- Fan-out/fan-in
- Asynchrone HTTP-API's
- Bewaking
- Menselijke tussenkomst
- Aggregator (stateful-entiteiten)
Patroon 1: Functiekoppeling
In het patroon voor functiekoppeling wordt een reeks functies in een specifieke volg orde uitgevoerd. In dit patroon wordt de uitvoer van een functie toegepast op de invoer van een andere functie. Het gebruik van wachtrijen tussen elke functie zorgt ervoor dat het systeem duurzaam en schaalbaar blijft, ook al is er een stroom van controle van de ene functie naar de volgende.
U kun Durable Functions gebruiken om het patroon voor functiekoppeling bondig te implementeren zoals in het volgende voorbeeld.
In dit voorbeeld zijn de waarden F1
, F2
, F3
en F4
de namen van andere functies in dezelfde functie-app. U kunt een controlestroom implementeren met behulp van normale imperatieve codeconstructies. Code wordt van boven naar beneden uitgevoerd. De code kan bestaande taalsemantiek voor controlestromen bevatten, zoals voorwaarden en lussen. U kunt logica voor foutafhandeling toevoegen in try
/catch
/finally
blokken.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
U kunt de context
-parameter gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Telkens de code await
aanroept controleert het Durable Functions-framework de voortgang van de huidige functie-instantie. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande await
-aanroep. Zie de volgende sectie Pattern #2: Fan out/fan in voor meer informatie.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
U kunt het context.df
-object gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Telkens de code yield
aanroept controleert het Durable Functions-framework de voortgang van de huidige functie-instantie. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande yield
-aanroep. Zie de volgende sectie Pattern #2: Fan out/fan in voor meer informatie.
Notitie
Het context
-object in Javascript vertegenwoordigt de volledige functiecontext. Open de Durable Functions-context met behulp van de eigenschap df
in de hoofdcontext.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
U kunt het context
-object gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Telkens de code yield
aanroept controleert het Durable Functions-framework de voortgang van de huidige functie-instantie. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande yield
-aanroep. Zie de volgende sectie Pattern #2: Fan out/fan in voor meer informatie.
Notitie
Het context
-object in Python vertegenwoordigt de indelingscontext. Open de Azure Functions-context met behulp van de eigenschap function_context
in de indelingscontext.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
U kunt de opdracht Invoke-DurableActivity
gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Steeds als de code Invoke-DurableActivity
aanroept zonder de schakeloptie NoWait
controleert het Durable Functions-framework de voortgang van de huidige functie-instantie. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande Invoke-DurableActivity
-aanroep. Zie de volgende sectie Pattern #2: Fan out/fan in voor meer informatie.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
U kunt het ctx
-object gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. De uitvoer van deze methoden is een Task<V>
object waarin het type gegevens is dat V
wordt geretourneerd door de aangeroepen functie. Telkens wanneer u aanroept Task<V>.await()
, controleert het Durable Functions-framework de voortgang van het huidige functie-exemplaar. Als het proces onverwacht halverwege de uitvoering wordt gerecycled, wordt het functie-exemplaar hervat vanaf de voorgaande Task<V>.await()
aanroep. Zie de volgende sectie Pattern #2: Fan out/fan in voor meer informatie.
Patroon 2: Uitwaaier/ventilator in
In het patroon uitwaaieren/inwaaieren voert u meerdere functies parallel uit en wacht u totdat alle functies zijn voltooid. Vaak wordt er een aggregatie toegepast op de resultaten die worden geretourneerd door de functies.
Bij normale functies kunt u uitwaaieren door de functie meerdere berichten te laten sturen naar een wachtrij. Terug inwaaieren is veel lastiger. Om in een normale functie in te waaieren schrijft u code om op te volgen wanneer de in de wachtrij geactiveerde functies aflopen en slaat u vervolgens de functie-uitvoer op.
De Durable Functions-extensie verwerkt dit patroon met relatief eenvoudige code:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Het uitwaaierwerk wordt verdeeld over meerdere instanties van de F2
-functie. Het werk wordt opgevolgd met een dynamische takenlijst. Task.WhenAll
wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2
-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3
-functie.
Het automatisch plaatsen van controlepunten bij de await
-aanroep op Task.WhenAll
zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Het uitwaaierwerk wordt verdeeld over meerdere instanties van de F2
-functie. Het werk wordt opgevolgd met een dynamische takenlijst. context.df.Task.all
-API wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2
-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3
-functie.
Het automatisch plaatsen van controlepunten bij de yield
-aanroep op context.df.Task.all
zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
Het uitwaaierwerk wordt verdeeld over meerdere instanties van de F2
-functie. Het werk wordt opgevolgd met een dynamische takenlijst. context.task_all
-API wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2
-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3
-functie.
Het automatisch plaatsen van controlepunten bij de yield
-aanroep op context.task_all
zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
Het uitwaaierwerk wordt verdeeld over meerdere instanties van de F2
-functie. Let op het gebruik van de aanroep van de NoWait
F2
functie: met deze switch kan de orchestrator doorgaan met het aanroepen F2
zonder te wachten op voltooiing van de activiteit. Het werk wordt opgevolgd met een dynamische takenlijst. De opdracht Wait-ActivityFunction
wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2
-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3
-functie.
Het automatisch plaatsen van controlepunten bij de Wait-ActivityFunction
-aanroep zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
Het uitwaaierwerk wordt verdeeld over meerdere instanties van de F2
-functie. Het werk wordt opgevolgd met een dynamische takenlijst. ctx.allOf(parallelTasks).await()
wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens worden de uitvoer van de F2
functie samengevoegd vanuit de lijst met dynamische taken en geretourneerd als uitvoer van de orchestratorfunctie.
De automatische controlepunten die plaatsvinden bij het .await()
aanroepen ctx.allOf(parallelTasks)
, zorgt ervoor dat een onverwachte procesre recycle geen reeds voltooide taken opnieuw hoeft te starten.
Notitie
In zeldzame gevallen is het mogelijk dat er een crash optreedt in het venster nadat een activiteitsfunctie wordt voltooid maar voor de voltooiing wordt opgeslagen in de indelingsgeschiedenis. Als dit gebeurt, dan wordt de activiteitsfunctie opnieuw uitgevoerd vanaf het begin nadat het proces hersteld wordt.
Patroon 3: Asynchrone HTTP-API's
Het asynchrone HTTP API-patroon biedt een oplossing voor het probleem van de coördinatie van langdurige bewerkingen met externe clients. Een gebruikelijke manier om dit patroon te implementeren is door de langdurige actie te laten activeren door een HTTP-eindpunt. Leid de client dan om naar een statuseindpunt dat de client bevraagt om te weten te komen wanneer de bewerking is voltooid.
Durable Functions biedt ingebouwde ondersteuning voor dit patroon, waardoor u minder complexe of zelfs helemaal geen code moet schrijven om te communiceren met de uitvoering van langdurige functies. In de quickstartvoorbeelden voor Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell en Java) ziet u bijvoorbeeld een eenvoudige REST-opdracht die u kunt gebruiken om nieuwe orchestratorfunctie-exemplaren te starten. Nadat een instantie is gestart, worden de HTTP-API's van de webhook met de status van de orchestrator-functie weergegeven.
Het volgende voorbeeld geeft REST-opdrachten weer die een orchestrator starten en de status opvragen. Voor de duidelijkheid zijn bepaalde protocoldetails weggelaten uit het voorbeeld.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Aangezien de Durable Functions-runtime de status voor u beheert, moet u geen eigen mechanisme voor het opvolgen van de status meer implementeren.
De Durable Functions-extensie laat ingebouwde HTTP API's zien die langlopende indelingen beheren. U kunt dit patroon ook zelf implementeren met behulp van uw eigen functietriggers (zoals HTTP, een wachtrij of Azure Event Hubs) en de duurzame clientbinding. Zo kunt u bijvoorbeeld een wachtrijbericht gebruiken om beëindiging te activeren. U kunt ook een HTTP-trigger gebruiken die wordt beveiligd door een Microsoft Entra-verificatiebeleid in plaats van de ingebouwde HTTP-API's die gebruikmaken van een gegenereerde sleutel voor verificatie.
Lees het artikel HTTP-functies voor meer informatie over hoe u asynchrone, langdurende processen via HTTP kunt weergeven met behulp van de Durable Functions-extensie.
Patroon 4: Bewaken
Het monitorpatroon verwijst naar een flexibel, terugkerend proces in een werkstroom. Een voorbeeld daarvan is navragen tot er aan specifieke voorwaarden voldaan is. U kunt een gewone timertrigger gebruiken voor een basisscenario, zoals een periodieke opschoningstaak, maar het interval is statisch en het beheer van de levensduur wordt complex. U kunt Durable Functions gebruiken om flexibele, terugkerende intervallen te creëren, de levensduur van taken te beheren en meerdere bewakingsprocessen maken vanuit één indeling.
Een voorbeeld van het bewakingspatroon is om het eerdere asynchrone HTTP API-scenario om te keren. In plaats van een eindpunt weer te geven waarmee een externe client een langdurige bewerking bewaakt, gebruikt de langdurige bewaking een extern eindpunt en wacht het tot de status verandert.
Met enkele regels code kunt u Durable Functions gebruiken om meerdere bewakingen te maken die willekeurige eindpunten observeren. De bewakingen kunnen de uitvoering stopzetten wanneer er aan een voorwaarde is voldaan, of een andere functie kan de duurzame indelingsclient gebruiken om de bewakingen te beëindigen. U kunt het wait
-interval van een bewaking veranderen op basis van een specifieke voorwaarde (bijvoorbeeld exponentieel uitstel).
Met de volgende code wordt een standaardbewaking geïmplementeerd:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Wanneer een verzoek ontvangen is, wordt er een nieuwe indelingsinstantie gemaakt voor die taak-id. Het exemplaar peilt een status totdat aan een voorwaarde wordt voldaan of totdat een time-out verloopt. Een duurzame timer controleert het polling-interval. Vervolgens kan er meer werk worden uitgevoerd of kan de indeling beëindigd worden.
Patroon 5: Menselijke interactie
Veel geautomatiseerde processen bevatten een menselijke tussenkomst. Mensen betrekken bij een geautomatiseerd proces is lastig, want mensen zijn niet zo beschikbaar en responsief als cloudservices. Een geautomatiseerd proces kan deze tussenkomst mogelijk maken door time-outs en compensatielogica te gebruiken.
Een goedkeuringsproces is een voorbeeld van een bedrijfsproces waar menselijke tussenkomst aan te pas komt. Zo kan de goedkeuring van een manager vereist zijn voor een onkostennota die een bepaald bedrag overschrijdt. Als de manager de onkostendeclaratie niet binnen 72 uur goedkeurt (de manager is misschien op vakantie gegaan), wordt een escalatieproces gestart om de goedkeuring van iemand anders te krijgen (misschien de manager van de manager).
U kunt het patroon in dit voorbeeld implementeren met behulp van een orchestrator-functie. De orchestrator gebruikt een duurzame timer om goedkeuring te vragen. De orchestrator escaleert wanneer er een time-out optreedt. De orchestrator wacht op een externe gebeurtenis zoals een melding die gegenereerd wordt door een menselijke tussenkomst.
In deze voorbeelden wordt een goedkeuringsproces gemaakt om het menselijke tussenkomstpatroon te demonstreren:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Roep context.CreateTimer
aan om de duurzame timer te maken. De melding wordt ontvangen door context.WaitForExternalEvent
. Vervolgens wordt Task.WhenAny
aangeroepen om te bepalen of er geëscaleerd wordt (time-out vindt eerst plaats) of dat de goedkeuring verwerkt wordt (de goedkeuring wordt ontvangen voor de time-out).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Roep context.df.createTimer
aan om de duurzame timer te maken. De melding wordt ontvangen door context.df.waitForExternalEvent
. Vervolgens wordt context.df.Task.any
aangeroepen om te bepalen of er geëscaleerd wordt (time-out vindt eerst plaats) of dat de goedkeuring verwerkt wordt (de goedkeuring wordt ontvangen voor de time-out).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Roep context.create_timer
aan om de duurzame timer te maken. De melding wordt ontvangen door context.wait_for_external_event
. Vervolgens wordt context.task_any
aangeroepen om te bepalen of er geëscaleerd wordt (time-out vindt eerst plaats) of dat de goedkeuring verwerkt wordt (de goedkeuring wordt ontvangen voor de time-out).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Roep Start-DurableTimer
aan om de duurzame timer te maken. De melding wordt ontvangen door Start-DurableExternalEventListener
. Vervolgens wordt Wait-DurableTask
aangeroepen om te bepalen of er geëscaleerd wordt (time-out vindt eerst plaats) of dat de goedkeuring verwerkt wordt (de goedkeuring wordt ontvangen voor de time-out).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
Met ctx.waitForExternalEvent(...).await()
de methodeaanroep wordt de indeling onderbroken totdat er een gebeurtenis met de naam ApprovalEvent
wordt ontvangen, die een boolean
nettolading heeft. Als de gebeurtenis wordt ontvangen, wordt een activiteitsfunctie aangeroepen om het goedkeuringsresultaat te verwerken. Als een dergelijke gebeurtenis echter niet wordt ontvangen voordat de timeout
(72 uur) verloopt, wordt er een TaskCanceledException
gegenereerd en wordt de Escalate
activiteitsfunctie aangeroepen.
Notitie
Er worden geen kosten in rekening gebracht voor tijd die is besteed aan het wachten op externe gebeurtenissen bij uitvoering in het verbruiksabonnement.
Een externe client kan de gebeurtenismelding afleveren aan een wachtende orchestrator-functie met behulp van de ingebouwde HTTP API's:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
Een gebeurtenis kan ook worden gegenereerd met behulp van de duurzame indelingsclient van een andere functie in dezelfde functie-app:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Patroon 6: Aggregator (stateful entiteiten)
Het zesde patroon is het samenvoegen van gebeurtenisgegevens gedurende een bepaalde periode in één enkele, adresseerbare entiteit. In dit patroon kunnen de gegevens die worden samengevoegd afkomstig zijn uit meerdere bronnen, in batches worden geleverd of verspreid zijn over lange perioden. De aggregator moet mogelijk actie ondernemen op gebeurtenisgegevens wanneer deze binnenkomen en externe clients moeten mogelijk query's uitvoeren op de geaggregeerde gegevens.
Wat de implementatie van dit patroon met normale, staatloze functies lastig maakt is dat het beheer van gelijktijdigheid erg moeilijk wordt. Niet alleen bewerken verschillende threads tegelijkertijd dezelfde gegevens, maar u dient er ook op te letten dat de aggregator slechts op één virtuele machine tegelijkertijd wordt uitgevoerd.
U kunt Duurzame entiteiten gebruiken om dit patroon eenvoudig als een enkele functie te implementeren.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Duurzame entiteiten kunnen ook gemodelleerd worden als klassen in .NET. Dit model kan handig zijn als de lijst met bewerkingen vaststaat en groot wordt. Het volgende voorbeeld is een equivalente implementatie van de Counter
-entiteit met .NET-klassen en -methodes.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Notitie
Duurzame entiteiten worden momenteel niet ondersteund in PowerShell.
Notitie
Duurzame entiteiten worden momenteel niet ondersteund in Java.
Clients kunnen bewerkingen in de wachtrij plaatsen (ook bekend als 'signalering') voor een entiteitsfunctie met behulp van de entiteitsclientbinding.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Notitie
Dynamisch gegenereerde proxy's zijn ook beschikbaar in .NET om entiteiten op typebeveiligde wijze te signaleren. Naast signaleren kunnen clients de status van een entiteitsfunctie ook opvragen met typebeveiligde methodes op de indelingsclientbinding.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Entiteitsfuncties zijn beschikbaar in Durable Functions 2.0 en hoger voor C#, JavaScript en Python.
De technologie
Achter de schermen is de Durable Functions-extensie gebouwd op het Durable Task Framework, een opensource-bibliotheek in GitHub die gebruikt wordt om werkstromen te bouwen in code. Net zoals Azure Functions de serverloze evolutie van Azure WebJobs is, is Durable Functions de serverloze evolutie van het Durable Task Framework. Microsoft en andere organisaties gebruiken het Durable Task Framework om bedrijfskritieke processen te automatiseren. Het sluit naadloos aan op de serverloze Azure Functions-omgeving.
Codebeperkingen
Om betrouwbare en langdurige uitvoeringsgaranties te bieden, beschikken orchestrator-functies over een set coderegels die gevolg moeten worden. Zie het artikel Codebeperkingen voor de orchestrator-functie.
Billing
Durable Functions worden op dezelfde manier in rekening gebracht als Azure Functions. Zie Prijzen voor Azure Functions voor meer informatie. Wanneer u orchestrator-functies in het Verbruiksabonnement van Azure Functions uitvoert, dient u op enkele factureringseigenschappen te letten. Zie het artikel Durable Functions-facturering voor meer informatie hierover.
Duik er meteen in
U kunt in minder dan 10 minuten aan de slag gaan met Durable Functions door een van deze taalspecifieke quickstart-zelfstudies te volgen:
- C# met Visual Studio 2019
- JavaScript met Visual Studio Code
- TypeScript met Visual Studio Code
- Python met Visual Studio Code
- PowerShell met Visual Studio Code
- Java met Maven
In deze quickstarts maakt en test u een lokale duurzame 'hallo wereld'-functie. Vervolgens publiceert u de functiecode op Azure. De functie die u maakt, organiseert en koppelt aanroepen naar andere functies.
Publicaties
Durable Functions is ontwikkeld in samenwerking met Microsoft Research. Hierdoor produceert het Durable Functions-team actief onderzoeksdocumenten en artefacten; dit zijn onder andere:
- Durable Functions: Semantiek voor stateful serverloos (OOPSLA'21)
- Serverloze werkstromen met Durable Functions en Netherite (vooraf afdrukken)
Meer informatie
De volgende video laat de voordelen van Durable Functions zien:
Omdat Durable Functions een geavanceerde extensie voor Azure Functions is, is het niet geschikt voor alle toepassingen. Zie voor een vergelijking met andere Azure-technologieën Compare Azure Functions and Azure Logic Apps.