Co je Durable Functions?
Durable Functions je funkce Azure Functions, která umožňuje psát stavové funkce v bezserverovém výpočetním prostředí. Rozšíření umožňuje definovat stavové pracovní postupy tím, že pomocí programovacího modelu Azure Functions napíšete funkce orchestrátoru a stavové entity. Rozšíření na pozadí spravuje stav, kontrolní body a restartuje, takže se můžete soustředit na obchodní logiku.
Podporované jazyky
Durable Functions je navržený tak, aby fungoval se všemi programovacími jazyky Azure Functions, ale pro každý jazyk může mít jiné minimální požadavky. Následující tabulka uvádí minimální podporované konfigurace aplikací:
Zásobník jazyků | Verze modulu runtime Azure Functions | Verze pracovního procesu jazyka | Minimální verze sad |
---|---|---|---|
.NET / C# / F# | Funkce 1.0+ | Probíhá proces Mimoprocesové |
Není k dispozici |
JavaScript/TypeScript (model verze 3) | Funkce 2.0+ | Uzel 8 a novější | Sady 2.x |
JavaScript/TypeScript (model v4 prog) | Funkce 4.25 a novější | Uzel 18 nebo novější | 3.15+ sady |
Python | Funkce 2.0+ | Python 3.7 nebo novější | Sady 2.x |
Python (v2 prog. model) | Funkce 4.0 a novější | Python 3.7 nebo novější | 3.15+ sady |
PowerShell | Funkce 3.0 a novější | PowerShell 7 a novější | Sady 2.x |
Java | Funkce 4.0 a novější | Java 8 a novější | Sady 4.x |
Důležité
Tento článek používá karty pro podporu více verzí programovacího modelu Node.js. Model v4 je obecně dostupný a je navržený tak, aby měl flexibilnější a intuitivnější prostředí pro vývojáře v JavaScriptu a TypeScriptu. Další podrobnosti o tom, jak model v4 funguje, najdete v příručce pro vývojáře služby Azure Functions Node.js. Další informace o rozdílech mezi v3 a v4 najdete v průvodci migrací.
Důležité
Tento článek používá karty pro podporu více verzí programovacího modelu Pythonu. Model v2 je obecně dostupný a je navržený tak, aby poskytoval přístupnější způsob vytváření funkcí prostřednictvím dekorátorů. Další podrobnosti o tom, jak model v2 funguje, najdete v příručce pro vývojáře v Pythonu pro Azure Functions.
Podobně jako Azure Functions existují šablony, které vám pomůžou vyvíjet Durable Functions pomocí sady Visual Studio, Visual Studio Code a webu Azure Portal.
Vzory aplikací
Primárním případem použití Durable Functions je zjednodušení složitých a stavových požadavků koordinace v bezserverových aplikacích. Následující části popisují typické vzory aplikací, které můžou využívat Durable Functions:
- Řetězení funkcí
- Ventilátor /ventilátor
- Asynchronní rozhraní API HTTP
- Monitorování
- Lidská interakce
- Agregátor (stavové entity)
Pattern č. 1: Řetězení funkcí
V modelu zřetězení funkcí se posloupnost funkcí provede v určitém pořadí. V tomto vzoru se výstup jedné funkce použije na vstup jiné funkce. Použití front mezi jednotlivými funkcemi zajišťuje, že systém zůstane trvalý a škálovatelný, i když existuje tok řízení z jedné funkce do další.
Durable Functions můžete použít k implementaci modelu řetězení funkcí stručně, jak je znázorněno v následujícím příkladu.
V tomto příkladu jsou hodnoty F1
, F2
, F3
a F4
jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat sémantiku toku řízení jazyka, jako jsou podmíněné výrazy a smyčky. Logiku zpracování chyb můžete zahrnout do try
//catch
finally
bloků.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Tento parametr můžete použít context
k vyvolání dalších funkcí podle názvu, předání parametrů a výstupu návratové funkce. Pokaždé, když kód volá await
, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího await
volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
Objekt můžete použít context.df
k vyvolání dalších funkcí podle názvu, předání parametrů a návratového výstupu funkce. Pokaždé, když kód volá yield
, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield
volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.
Poznámka:
Objekt context
v JavaScriptu představuje celý kontext funkce. Přístup k kontextu Durable Functions pomocí df
vlastnosti v hlavním kontextu.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Objekt můžete použít context
k vyvolání dalších funkcí podle názvu, předání parametrů a návratového výstupu funkce. Pokaždé, když kód volá yield
, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield
volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.
Poznámka:
Objekt context
v Pythonu představuje kontext orchestrace. Přístup k hlavnímu function_context
kontextu Azure Functions pomocí vlastnosti v kontextu orchestrace.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
Příkaz můžete použít Invoke-DurableActivity
k vyvolání dalších funkcí podle názvu, předání parametrů a výstupu návratové funkce. Pokaždé, když kód volá Invoke-DurableActivity
bez NoWait
přepínače, architektura Durable Functions zkontroluje průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího Invoke-DurableActivity
volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Objekt můžete použít ctx
k vyvolání dalších funkcí podle názvu, předání parametrů a návratového výstupu funkce. Výstup těchto metod je objekt, Task<V>
kde V
je typ dat vrácený vyvolanou funkcí. Pokaždé, když zavoláte Task<V>.await()
, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces neočekávaně recykluje uprostřed provádění, instance funkce se obnoví z předchozího Task<V>.await()
volání. Další informace najdete v další části Vzor č. 2: Ventilátor nebo ventilátor.
Vzorek č. 2: Ventilátor nebo ventilátor v
Ve vzorci ventilátoru/ventilátoru spustíte paralelně více funkcí a pak počkáte na dokončení všech funkcí. U výsledků vrácených z funkcí se často provádí určitá agregační práce.
Díky normálním funkcím můžete posílat více zpráv do fronty. Fanning vzadu je mnohem náročnější. Pokud chcete ventilátorovat, napíšete v normální funkci kód, který bude sledovat, kdy funkce aktivované frontou končí, a pak uloží výstupy funkce.
Rozšíření Durable Functions zpracovává tento vzor s relativně jednoduchým kódem:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Práce s ventilátorem se distribuuje F2
do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. Task.WhenAll
je volána k čekání na dokončení všech volané funkce. F2
Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3
.
Automatické vytváření kontrolních bodů, které se děje při await
volání, Task.WhenAll
zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Práce s ventilátorem se distribuuje F2
do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. context.df.Task.all
Rozhraní API se volá, aby čekalo na dokončení všech volavaných funkcí. F2
Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3
.
Automatické vytváření kontrolních bodů, které se děje při yield
volání, context.df.Task.all
zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
Práce s ventilátorem se distribuuje F2
do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. context.task_all
Rozhraní API se volá, aby čekalo na dokončení všech volavaných funkcí. F2
Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3
.
Automatické vytváření kontrolních bodů, které se děje při yield
volání, context.task_all
zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
Práce s ventilátorem se distribuuje F2
do několika instancí funkce. Všimněte si použití NoWait
přepínače F2
při vyvolání funkce: tento přepínač umožňuje orchestrátoru pokračovat v volání F2
bez čekání na dokončení aktivity. Práce se sleduje pomocí dynamického seznamu úkolů. Příkaz Wait-ActivityFunction
se volá, aby počkal, až se dokončí všechny volané funkce. F2
Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3
.
Automatické vytváření kontrolních bodů, které se děje při Wait-ActivityFunction
volání, zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
Práce s ventilátorem se distribuuje F2
do několika instancí funkce. Práce se sleduje pomocí dynamického seznamu úkolů. ctx.allOf(parallelTasks).await()
je volána k čekání na dokončení všech volané funkce. F2
Výstupy funkce se pak agregují ze seznamu dynamických úloh a vrátí se jako výstup funkce orchestrátoru.
Automatické vytváření kontrolních bodů, které se děje při .await()
volání, ctx.allOf(parallelTasks)
zajišťuje, že neočekávaný recyklace procesu nevyžaduje restartování již dokončených úkolů.
Poznámka:
Ve výjimečných případech je možné, že po dokončení funkce aktivity dojde k chybovému ukončení v okně, ale před uložením jeho dokončení do historie orchestrace. Pokud k tomu dojde, funkce aktivity by se po obnovení procesu znovu spustila od začátku.
Vzor č. 3: Asynchronní rozhraní HTTP API
Asynchronní vzor rozhraní HTTP API řeší problém koordinace stavu dlouhotrvajících operací s externími klienty. Běžným způsobem implementace tohoto modelu je, že koncový bod HTTP aktivuje dlouhotrvající akci. Pak přesměrujte klienta na koncový bod stavu, který klient dotazuje, až se operace dokončí.
Durable Functions poskytuje integrovanou podporu pro tento vzor, zjednodušuje nebo dokonce odebírá kód, který potřebujete k zápisu pro interakci s dlouhotrvajícími spouštěními funkcí. Ukázky rychlého startu Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell a Java) ukazují jednoduchý příkaz REST, který můžete použít ke spuštění nových instancí funkcí orchestrátoru. Po spuštění instance rozšíření zveřejňuje rozhraní API HTTP webhooku, která se dotazují na stav funkce orchestrátoru.
Následující příklad ukazuje příkazy REST, které spouští orchestrátor a dotazuje se na jeho stav. Kvůli přehlednosti jsou některé podrobnosti protokolu z příkladu vynechány.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Vzhledem k tomu, že modul runtime Durable Functions spravuje stav za vás, nemusíte implementovat vlastní mechanismus sledování stavu.
Rozšíření Durable Functions zveřejňuje integrovaná rozhraní API HTTP, která spravují dlouhotrvající orchestrace. Tento vzor můžete implementovat sami pomocí vlastních triggerů funkcí (například HTTP, fronty nebo služby Azure Event Hubs) a trvalé vazby klienta. K aktivaci ukončení můžete například použít zprávu fronty. Nebo můžete použít trigger HTTP, který je chráněný zásadami ověřování Microsoft Entra místo integrovaných rozhraní HTTP API, která k ověřování používají vygenerovaný klíč.
Další informace najdete v článku o funkcích HTTP, který vysvětluje, jak můžete zveřejnit asynchronní dlouhotrvající procesy přes HTTP pomocí rozšíření Durable Functions.
Vzor č. 4: Monitorování
Vzor monitorování odkazuje na flexibilní opakující se proces v pracovním postupu. Příkladem je dotazování, dokud nebudou splněny konkrétní podmínky. Pomocí běžného triggeru časovače můžete řešit základní scénář, jako je například pravidelná úloha čištění, ale jeho interval je statický a správa životností instancí je složitá. Durable Functions můžete použít k vytváření flexibilních intervalů opakování, správě životností úloh a vytváření více procesů monitorování z jedné orchestrace.
Příkladem modelu monitorování je obrácení předchozího asynchronního scénáře rozhraní HTTP API. Místo vystavení koncového bodu pro externího klienta pro monitorování dlouhotrvající operace využívá dlouhotrvající monitorování externí koncový bod a pak čeká na změnu stavu.
V několika řádcích kódu můžete pomocí Durable Functions vytvořit několik monitorů, které sledují libovolné koncové body. Monitorování můžou ukončit provádění, pokud je splněna podmínka, nebo jiná funkce může k ukončení monitorování použít trvalý orchestrační klient. Interval monitorování wait
můžete změnit na základě konkrétní podmínky (například exponenciální zpoždnění).)
Následující kód implementuje základní monitorování:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Po přijetí požadavku se pro toto ID úlohy vytvoří nová instance orchestrace. Instance se dotazuje na stav, dokud není splněna podmínka nebo dokud nevyprší časový limit. Trvalý časovač řídí interval dotazování. Pak je možné provést více práce nebo orchestrace může končit.
Vzor č. 5: Lidská interakce
Mnoho automatizovaných procesů zahrnuje určitý druh lidské interakce. Zapojení lidí do automatizovaného procesu je složité, protože lidé nejsou tak vysoce dostupné a reagují jako cloudové služby. Automatizovaný proces může této interakci umožnit pomocí časových limitů a logiky kompenzace.
Proces schvalování je příkladem obchodního procesu, který zahrnuje lidskou interakci. Schválení manažera může být vyžadováno pro sestavu výdajů, která překračuje určitou částku v dolarech. Pokud manažer zprávu o výdajích neschválí do 72 hodin (může jít o manažera na dovolenou), zahájí se proces eskalace, který získá schválení od někoho jiného (možná manažera).
Vzor v tomto příkladu můžete implementovat pomocí funkce orchestrátoru. Orchestrátor používá trvalý časovač k vyžádání schválení. Orchestrátor eskaluje, pokud dojde k vypršení časového limitu. Orchestrátor čeká na externí událost, například na oznámení vygenerované lidskou interakcí.
Tyto příklady vytvoří schvalovací proces, který předvede vzor lidské interakce:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Chcete-li vytvořit trvalý časovač, volání context.CreateTimer
. Oznámení obdrží context.WaitForExternalEvent
. Pak se zavolá, aby se rozhodlo, Task.WhenAny
jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Chcete-li vytvořit trvalý časovač, volání context.df.createTimer
. Oznámení obdrží context.df.waitForExternalEvent
. Pak se zavolá, aby se rozhodlo, context.df.Task.any
jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Chcete-li vytvořit trvalý časovač, volání context.create_timer
. Oznámení obdrží context.wait_for_external_event
. Pak se zavolá, aby se rozhodlo, context.task_any
jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Chcete-li vytvořit trvalý časovač, volání Start-DurableTimer
. Oznámení obdrží Start-DurableExternalEventListener
. Pak se zavolá, aby se rozhodlo, Wait-DurableTask
jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
Volání ctx.waitForExternalEvent(...).await()
metody pozastaví orchestraci, dokud neobdrží událost s názvem ApprovalEvent
, která má datovou boolean
část. Pokud je událost přijata, volá se funkce aktivity pro zpracování výsledku schválení. Pokud však žádná taková událost není přijata před vypršením timeout
platnosti (72 hodin), vyvolá se vyvolání TaskCanceledException
a Escalate
zavolá se funkce aktivity.
Poznámka:
Za dobu strávenou čekáním na externí události při spuštění v plánu Consumption se neúčtují žádné poplatky.
Externí klient může doručit oznámení události do funkce čekající orchestrátoru pomocí integrovaných rozhraní API HTTP:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
Událost lze vyvolat také pomocí klienta trvalé orchestrace z jiné funkce ve stejné aplikaci funkcí:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Vzor č. 6: Agregátor (stavové entity)
Šestý vzor se týká agregace dat událostí v určitém časovém období do jedné adresovatelné entity. V tomto modelu můžou agregovaná data pocházet z více zdrojů, mohou být doručena v dávkách nebo mohou být rozptýlená po dlouhou dobu. Agregátor může potřebovat provést akci s daty událostí při jejich doručení a externí klienti se můžou muset dotazovat na agregovaná data.
Složitá věc, když se snažíte implementovat tento vzor s normálními bezstavovými funkcemi, je, že řízení souběžnosti se stává obrovskou výzvou. Nemusíte si dělat starosti jenom s několika vlákny, které upravují stejná data současně, ale potřebujete se také starat o to, aby agregátor běžel jenom na jednom virtuálním počítači najednou.
Durable entity můžete použít k snadné implementaci tohoto vzoru jako jedné funkce.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Odolné entity lze také modelovat jako třídy v .NET. Tento model může být užitečný, pokud je seznam operací pevný a bude velký. Následující příklad je ekvivalentní implementací Counter
entity pomocí tříd a metod .NET.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Poznámka:
Odolné entity se v PowerShellu v současné době nepodporují.
Poznámka:
Odolné entity se v javě v současné době nepodporují.
Klienti mohou zavázat operace pro funkci entity (označovanou také jako "signaling") pomocí vazby klienta entity.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Poznámka:
Dynamicky generované proxy servery jsou také k dispozici v .NET pro signalizaci entit způsobem bezpečným způsobem. Kromě signalizace mohou klienti také dotazovat stav funkce entity pomocí metod bezpečných typů na vazbě klienta orchestrace.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Funkce entit jsou k dispozici v Durable Functions 2.0 a vyšší pro C#, JavaScript a Python.
Technologie
Rozšíření Durable Functions je na pozadí postavené na platformě Durable Task Framework, opensourcové knihovně na GitHubu, která se používá k vytváření pracovních postupů v kódu. Stejně jako Azure Functions je bezserverový vývoj webových úloh Azure, Durable Functions je bezserverový vývoj architektury Durable Task Framework. Microsoft a další organizace používají architekturu Durable Task Framework k rozsáhlé automatizaci důležitých procesů. Je přirozeným prostředím bezserverové služby Azure Functions.
Omezení kódu
Aby bylo možné poskytovat spolehlivé a dlouhotrvající záruky provádění, mají funkce orchestrátoru sadu pravidel kódování, která je potřeba dodržovat. Další informace najdete v článku Omezení kódu funkce nástroje Orchestrator.
Fakturace
Durable Functions se účtují stejně jako Azure Functions. Další informace najdete v tématu Ceny služby Azure Functions. Při spouštění funkcí orchestrátoru v plánu Consumption služby Azure Functions existuje několik chování fakturace, o nichž je potřeba vědět. Další informace o těchto chováních najdete v článku fakturace Durable Functions.
Přeskočte doprava
S Durable Functions můžete začít pracovat za méně než 10 minut dokončením některého z těchto kurzů rychlých startů pro konkrétní jazyk:
- C# pomocí sady Visual Studio 2019
- JavaScript s využitím editoru Visual Studio Code
- TypeScript s využitím editoru Visual Studio Code
- Python s využitím editoru Visual Studio Code
- PowerShell s využitím editoru Visual Studio Code
- Java s využitím Mavenu
V těchto rychlých startech místně vytvoříte a otestujete odolnou funkci "hello world". Kód funkce potom publikujete do Azure. Funkce, kterou vytvoříte, orchestruje a zřetědí volání dalších funkcí.
Publikace
Durable Functions se vyvíjí ve spolupráci s Microsoft Research. V důsledku toho tým Durable Functions aktivně vytváří výzkumné studie a artefakty; mezi ně patří:
- Durable Functions: Sémantika pro stavový bezserverový (OOPSLA'21)
- Bezserverové pracovní postupy s Durable Functions a Netherite (před tiskem)
Další informace
Následující video ukazuje výhody Durable Functions:
Vzhledem k tomu, že Durable Functions je rozšířené rozšíření pro Azure Functions, není vhodné pro všechny aplikace. Porovnání s jinými technologiemi orchestrace Azure najdete v tématu Porovnání Azure Functions a Azure Logic Apps.