Poznámka
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Durable Functions je funkce Azure Functions, která umožňuje psát stavové funkce v bezserverovém výpočetním prostředí. Rozšíření umožňuje definovat stavové pracovní postupy tím, že pomocí programovacího modelu Azure Functions napíšete funkce orchestrátoru a stavové entity. Rozšíření v zákulisí spravuje stav, kontrolní body a restartování, což vám umožňuje soustředit se na obchodní logiku.
Podporované jazyky
Durable Functions je navržený tak, aby fungoval se všemi programovacími jazyky Azure Functions, ale pro každý jazyk může mít jiné minimální požadavky. Následující tabulka uvádí minimální podporované konfigurace aplikací:
Zásobník jazyků | Verze modulu runtime Azure Functions | Verze jazykového pracovníka | Minimální verze sad |
---|---|---|---|
.NET / C# / F# | Funkce 1.0+ | Probíhá proces Mimoprocesové |
Není k dispozici |
JavaScript/TypeScript (programovací model verze 3) | Funkce 2.0+ | Node.js 8 a novější | Balíčky 2.x |
JavaScript/TypeScript (programovací model v4) | Funkce 4.25 a novější | Node.js ve verzi 18 nebo novější | 3.15+ sady |
Python | Funkce 2.0+ | Python 3.7 nebo novější | Balíčky 2.x |
Python (v2 programovací model) | Funkce 4.0 a novější | Python 3.7 nebo novější | 3.15+ sady |
PowerShell | Funkce 3.0 a novější | PowerShell 7 a novější | Sady 2.x |
Java | Funkce 4.0 a novější | Java 8 a novější | Balíčky 4.x |
Důležité
Tento článek používá karty pro podporu více verzí programovacího modelu Node.js. Model v4 je obecně dostupný a je navržený tak, aby měl flexibilnější a intuitivnější prostředí pro vývojáře v JavaScriptu a TypeScriptu. Další podrobnosti o tom, jak model v4 funguje, najdete v příručce pro vývojáře služby Azure Functions Node.js. Další informace o rozdílech mezi v3 a v4 najdete v průvodci migrací.
Důležité
Tento článek používá karty pro podporu více verzí programovacího modelu Pythonu. Model v2 je obecně dostupný a je navržený tak, aby poskytoval způsob vytváření funkcí, který je více zaměřený na kód, prostřednictvím dekorátorů. Další informace o modelu v2 najdete v příručce pro vývojáře v Pythonu pro Azure Functions.
Podobně jako Azure Functions existují šablony, které vám pomůžou vyvíjet Durable Functions pomocí sady Visual Studio, Visual Studio Code a webu Azure Portal.
Vzory aplikací
Primárním případem použití Durable Functions je zjednodušení složitých a stavových požadavků koordinace v bezserverových aplikacích. Následující části popisují typické vzory aplikací, které můžou využívat Durable Functions:
- Řetězení funkcí
- Ventilátor /ventilátor
- Asynchronní rozhraní API HTTP
- Monitorování
- Lidská interakce
- Agregátor (stavové entity)
Pattern č. 1: Řetězení funkcí
V modelu zřetězení funkcí se posloupnost funkcí provede v určitém pořadí. V tomto vzoru se výstup jedné funkce použije na vstup jiné funkce. Použití front mezi jednotlivými funkcemi zajišťuje, že systém zůstane trvalý a škálovatelný, i když existuje tok řízení z jedné funkce do další.
Durable Functions můžete použít k implementaci modelu řetězení funkcí stručně, jak je znázorněno v následujícím příkladu.
Důležité
Podpora modelu v procesu končí 10. listopadu 2026. Důrazně doporučujeme migrovat aplikace do izolovaného pracovního modelu.
V tomto příkladu jsou hodnoty F1
, F2
, F3
a F4
jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat sémantiku toku řízení jazyka, jako jsou podmíněné výrazy a smyčky. Logiku zpracování chyb můžete zahrnout do try
/catch
/finally
bloků.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Tento parametr context
můžete použít k vyvolání dalších funkcí podle jména, předání parametrů a návratu výstupu funkce. Pokaždé, když kód volá await
, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího await
volání. Další informace najdete v následující části: Vzor č. 2: Fan-out/fan-in.
V tomto příkladu jsou hodnoty F1
, F2
, F3
a F4
jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat existující řídicí struktury jazyka, jako jsou podmínky a smyčky. Logiku zpracování chyb můžete zahrnout do try
/catch
/finally
bloků.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
Objekt context.df
můžete použít k volání dalších funkcí podle jejich názvu, předávání parametrů a získání výstupu funkce. Pokaždé, když kód volá yield
, framework Durable Functions vytvoří kontrolní bod pro zaznamenání průběhu aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield
volání. Další informace najdete v další části Vzor č. 2: Fan-out/fan-in.
Poznámka:
Objekt context
v JavaScriptu představuje celý kontext funkce. Přístup k kontextu Durable Functions pomocí df
vlastnosti v hlavním kontextu.
V tomto příkladu jsou hodnoty F1
, F2
, F3
a F4
jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat řídící struktury jazyka, jako jsou podmínky a cykly.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Objekt context
můžete použít k vyvolání dalších funkcí podle názvu, předání parametrů a získání výstupu funkce. Pokaždé, když kód volá yield
, rámec Durable Functions připraví kontrolní bod postupu aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield
volání. Další informace najdete v další kapitole, Vzor č. 2: Fan-out/fan-in.
Poznámka:
Objekt context
v Pythonu představuje kontext orchestrace. Přístup k hlavnímu function_context
kontextu Azure Functions pomocí vlastnosti v kontextu orchestrace.
V tomto příkladu jsou hodnoty F1
, F2
, F3
a F4
jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat stávající principy řízení toku programu, jako jsou podmínky a cykly.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
Příkaz můžete použít Invoke-DurableActivity
k vyvolání dalších funkcí podle názvu, předání parametrů a výstupu návratové funkce. Pokaždé, když kód volá Invoke-DurableActivity
bez NoWait
přepínače, architektura Durable Functions zkontroluje průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího Invoke-DurableActivity
volání. Další informace najdete v další sekci Vzor č. 2: Fan-out/fan-in.
V tomto příkladu jsou hodnoty F1
, F2
, F3
a F4
jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může obsahovat sémantiku řízení toku programu, jako jsou podmínky a smyčky.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Objekt ctx
můžete použít k vyvolání dalších funkcí podle názvu, pro předání parametrů a vrácení výstupu funkce. Výstup těchto metod je objekt, Task<V>
kde V
je typ dat vrácený vyvolanou funkcí. Pokaždé, když zavoláte Task<V>.await()
, framework Durable Functions ukládá kontrolní body pro průběh aktuální instance funkce. Pokud proces neočekávaně recykluje uprostřed provádění, instance funkce se obnoví z předchozího Task<V>.await()
volání. Další informace najdete v následující sekci Vzor č. 2: Fan-out/fan-in.
Vzor č. 2: Fan-out/fan-in
Ve vzoru ventilátoru/ventilátoru spustíte paralelně několik funkcí a pak počkáte na dokončení všech funkcí. U výsledků vrácených z funkcí se často provádí určitá agregační práce.
Díky normálním funkcím můžete posílat více zpráv do fronty. Zapojování zpět je mnohem náročnější. Pokud chcete implementovat „fan in“ ve funkci, napíšete kód, který sleduje, kdy končí funkce aktivované frontou, a potom uložíte jejich výstupy.
Rozšíření Durable Functions zpracovává tento vzor s relativně jednoduchým kódem:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Práce rozvětvení je distribuována do několika instancí funkce F2
. Práce se sleduje pomocí dynamického seznamu úkolů.
Task.WhenAll
je volána k čekání na dokončení všech volaných funkcí. Výstupy funkce F2
se pak agregují ze seznamu dynamických úkolů a předají se funkci F3
.
Automatické vytváření kontrolních bodů, které se děje při await
volání na Task.WhenAll
, zajišťuje, že případná havárie nebo restart během procesu nevyžaduje opětovné spuštění již dokončené úlohy.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Práce fan-out je distribuována do několika instancí funkce F2
. Práce se sleduje pomocí dynamického seznamu úkolů.
context.df.Task.all
Rozhraní API se volá, aby čekalo na dokončení všech volaných funkcí. Pak se výstupy funkce F2
agregují ze seznamu dynamických úkolů a předají se funkci F3
.
Automatické vytváření kontrolních bodů, které nastává při volání yield
na context.df.Task.all
, zajišťuje, že nenadálý pád systému nebo restartování v průběhu nevyžaduje opětovné spuštění již dokončené úlohy.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
Pracovní rozdělení je distribuováno do několika instancí funkce F2
. Práce se sleduje pomocí dynamického seznamu úkolů.
context.task_all
Rozhraní API se volá, aby počkalo na dokončení všech volaných funkcí.
F2
Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3
.
Automatické vytváření kontrolních bodů, které se děje při yield
volání, context.task_all
zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
Práce fan-out je rozdělena do několika instancí funkce F2
. Všimněte si použití přepínače NoWait
při vyvolání funkce F2
: tento přepínač umožňuje orchestrátoru pokračovat ve volání F2
bez čekání na dokončení činnosti. Práce se sleduje pomocí dynamického seznamu úkolů. Příkaz Wait-ActivityFunction
se volá, aby počkal, až se dokončí všechny volané funkce.
F2
Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3
.
Automatické vytváření kontrolních bodů, které se děje při Wait-ActivityFunction
volání, zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
Rozvětvená práce je distribuována do několika instancí funkce F2
. Práce se sleduje pomocí dynamického seznamu úkolů.
ctx.allOf(parallelTasks).await()
je volán, aby vyčkal na dokončení všech volaných funkcí.
F2
Výstupy funkce se pak agregují ze seznamu dynamických úloh a vrátí se jako výstup funkce orchestrátoru.
Automatické vytváření kontrolních bodů se děje při .await()
volání, ctx.allOf(parallelTasks)
zajišťuje, že neočekávané recyklování procesu nevyžaduje restartování již dokončených úkolů.
Poznámka:
Ve výjimečných případech je možné, že po dokončení funkce aktivity dojde k chybovému ukončení v okně, ale před uložením jeho dokončení do historie orchestrace. Pokud k tomu dojde, funkce aktivity se po obnovení procesu znovu spustí od začátku.
Vzor č. 3: Asynchronní rozhraní HTTP API
Asynchronní vzor rozhraní HTTP API řeší problém koordinace stavu dlouhotrvajících operací s externími klienty. Běžným způsobem implementace tohoto modelu je, že koncový bod HTTP aktivuje dlouhotrvající akci. Pak přesměrujte klienta na koncový bod stavu, který klient dotazuje, až se operace dokončí.
Durable Functions poskytuje integrovanou podporu pro tento vzor, zjednodušuje nebo dokonce odebírá kód, který potřebujete k zápisu pro interakci s dlouhotrvajícími spouštěními funkcí. Ukázky rychlého startu Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell a Java) ukazují jednoduchý příkaz REST, který můžete použít ke spuštění nových instancí funkcí orchestrátoru. Po spuštění instance rozšíření zveřejňuje rozhraní API HTTP webhooku, která se dotazují na stav funkce orchestrátoru.
Následující příklad ukazuje příkazy REST, které spouští orchestrátor a dotazuje se na jeho stav. Kvůli přehlednosti jsou některé podrobnosti protokolu z příkladu vynechány.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Vzhledem k tomu, že modul runtime Durable Functions spravuje stav za vás, nemusíte implementovat vlastní mechanismus sledování stavu.
Rozšíření Durable Functions zveřejňuje integrovaná rozhraní API HTTP, která spravují dlouhotrvající orchestrace. Tento vzor můžete implementovat sami pomocí vlastních triggerů funkcí (například HTTP, fronty nebo služby Azure Event Hubs) a trvalé vazby klienta. K aktivaci ukončení můžete například použít zprávu fronty. Nebo můžete použít trigger HTTP, který je chráněný zásadami ověřování Microsoft Entra místo integrovaných rozhraní HTTP API, která k ověřování používají vygenerovaný klíč.
Další informace najdete v článku o funkcích HTTP, který vysvětluje, jak můžete zveřejnit asynchronní dlouhotrvající procesy přes HTTP pomocí rozšíření Durable Functions.
Vzor č. 4: Monitor
Vzor monitorování odkazuje na flexibilní opakující se proces v pracovním postupu. Příkladem je dotazování, dokud nebudou splněny konkrétní podmínky. Pomocí běžného triggeru časovače můžete řešit základní scénář, jako je například pravidelná úloha čištění, ale jeho interval je statický a správa životností instancí je složitá. Durable Functions můžete použít k vytváření flexibilních intervalů opakování, správě životností úloh a vytváření více procesů monitorování z jedné orchestrace.
Příkladem modelu monitorování je obrácení předchozího asynchronního scénáře rozhraní HTTP API. Místo vystavení koncového bodu pro externího klienta k monitorování dlouhotrvající operace využívá dlouhotrvající monitor externí koncový bod a čeká na změnu stavu.
V několika řádcích kódu můžete pomocí Durable Functions vytvořit několik monitorů, které sledují libovolné koncové body. Monitorování můžou ukončit provádění, pokud je splněna podmínka, nebo jiná funkce může k ukončení monitorování použít trvalý orchestrační klient. Interval monitorování wait
můžete změnit na základě konkrétní podmínky (například exponenciální zpoždnění).)
Následující kód implementuje základní monitorování:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Po přijetí požadavku se pro toto ID úlohy vytvoří nová instance orchestrace. Instance opakovaně kontroluje stav, dokud není splněna podmínka, nebo dokud nevyprší časový limit. Trvalý časovač řídí interval dotazování. Poté lze provést více práce, nebo orchestrace může být ukončena.
Vzor č. 5: Lidská interakce
Mnoho automatizovaných procesů zahrnuje určitý druh lidské interakce. Zapojení lidí do automatizovaného procesu je složité, protože lidé nejsou tak vysoce dostupní ani tak pohotově reagující jako cloudové služby. Automatizovaný proces může této interakci umožnit pomocí časových limitů a logiky kompenzace.
Proces schvalování je příkladem obchodního procesu, který zahrnuje lidskou interakci. Schválení manažera může být vyžadováno pro sestavu výdajů, která překračuje určitou částku v dolarech. Pokud manažer zprávu o výdajích neschválí do 72 hodin (může jít o manažera na dovolenou), zahájí se proces eskalace, který získá schválení od někoho jiného (možná manažera).
Vzor v tomto příkladu můžete implementovat pomocí funkce orchestrátoru. Orchestrátor používá trvalý časovač k vyžádání schválení. Orchestrátor eskaluje, pokud dojde k vypršení časového limitu. Orchestrátor čeká na externí událost, například na oznámení vygenerované lidskou interakcí.
Tyto příklady vytvoří schvalovací proces, který předvede vzor lidské interakce:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Chcete-li vytvořit trvalý časovač, zavolejte context.CreateTimer
. Oznámení obdrží context.WaitForExternalEvent
. Potom je Task.WhenAny
zavolána pro rozhodnutí, zda eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Chcete-li vytvořit trvalý časovač, zavolejte context.df.createTimer
. Oznámení obdrží context.df.waitForExternalEvent
. Potom se zavolá k rozhodnutí, context.df.Task.any
jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Chcete-li vytvořit trvalý časovač, volejte context.create_timer
. Oznámení obdrží context.wait_for_external_event
. Potom je vyvolána pro rozhodnutí, context.task_any
zda se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Chcete-li vytvořit trvalý časovač, volejte Start-DurableTimer
. Oznámení obdrží Start-DurableExternalEventListener
. Pak se zavolá Wait-DurableTask
, aby se rozhodlo, jestli se má eskalovat (pokud nejdříve vyprší časový limit), nebo zpracovat schválení (pokud se schválení obdrží před vypršením časového limitu).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
Volání ctx.waitForExternalEvent(...).await()
metody pozastaví orchestraci, dokud neobdrží událost s názvem ApprovalEvent
, která má datový boolean
obsah. Pokud je událost přijata, volá se funkce aktivity pro zpracování výsledku schválení. Pokud se však žádná taková událost nepřijala před vypršením timeout
platnosti (72 hodin), TaskCanceledException
vyvolá se a Escalate
volá se funkce aktivity.
Poznámka:
Za dobu strávenou čekáním na externí události při spuštění v plánu Consumption se neúčtují žádné poplatky.
Externí klient může doručit oznámení události do funkce čekající orchestrátoru pomocí integrovaných rozhraní API HTTP:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
Událost lze vyvolat také pomocí klienta trvalé orchestrace z jiné funkce ve stejné aplikaci funkcí:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Vzor č. 6: Agregátor (stavové entity)
Šestý vzor se týká agregace dat událostí v určitém časovém období do jedné adresovatelné entity. V tomto modelu můžou agregovaná data pocházet z více zdrojů, mohou být doručena v dávkách nebo mohou být rozptýlená po dlouhou dobu. Agregátor může potřebovat provést akci s daty událostí při jejich doručení a externí klienti se můžou muset dotazovat na agregovaná data.
Složitá věc, když se snažíte implementovat tento vzor s normálními bezstavovými funkcemi, je, že řízení souběžnosti se stává obrovskou výzvou. Nemusíte se starat jenom o několik vláken, které upravují stejná data současně, ale také se musíte starat o zajištění toho, aby agregátor běžel jenom na jednom virtuálním počítači najednou.
Durable entity můžete použít k snadné implementaci tohoto vzoru jako jedné funkce.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Odolné entity lze také modelovat jako třídy v .NET. Tento model může být užitečný, pokud je seznam operací pevný a bude velký. Následující příklad je ekvivalentní implementací Counter
entity pomocí tříd a metod .NET.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Poznámka:
Odolné entity se v PowerShellu v současné době nepodporují.
Poznámka:
Odolné entity nejsou v současné době v Javě podporovány.
Klienti mohou vytvořit frontu operací pro (označované také jako signalizační) funkci entity pomocí vazby klienta entity.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Poznámka:
Dynamicky generované proxy jsou v .NET také k dispozici pro signalizaci entit způsobem zajišťujícím bezpečnost typů. Kromě signalizace mohou klienti také dotazovat stav funkce entity pomocí metod bezpečných typů na vazbě klienta orchestrace.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Funkce entit jsou k dispozici v Durable Functions 2.0 a vyšší pro C#, JavaScript a Python.
Technologie
Rozšíření Durable Functions je v zákulisí postavené na platformě Durable Task Framework, což je open-source knihovna na GitHubu, která se používá k vytváření pracovních postupů v kódu. Stejně jako Azure Functions je bezserverový vývoj webových úloh Azure, Durable Functions je bezserverový vývoj architektury Durable Task Framework. Microsoft a další organizace používají architekturu Durable Task Framework k rozsáhlé automatizaci důležitých procesů. Je přirozeně vhodné pro bezserverové prostředí Azure Functions.
Omezení kódu
Aby bylo možné poskytovat spolehlivé a dlouhotrvající záruky provádění, mají funkce orchestrátoru sadu pravidel kódování, která je potřeba dodržovat. Další informace najdete v tématu Omezení kódu funkce orchestratoru.
Fakturace
Durable Functions se účtuje stejně jako Azure Functions. Další informace najdete v tématu Ceny služby Azure Functions. Při spouštění funkcí orchestrátoru v plánu Consumption služby Azure Functions existuje několik fakturačních chování, o nichž je potřeba vědět. Další informace o těchto chováních najdete v článku fakturace Durable Functions.
Ponořte se rovnou do toho
S Durable Functions můžete začít pracovat za méně než 10 minut dokončením některého z těchto kurzů rychlých startů pro konkrétní jazyk:
- C# pomocí sady Visual Studio
- JavaScript s využitím editoru Visual Studio Code
- TypeScript s využitím editoru Visual Studio Code
- Python s využitím editoru Visual Studio Code
- PowerShell s využitím editoru Visual Studio Code
- Java s využitím Mavenu
V těchto rychlých startech místně vytvoříte a otestujete odolnou funkci Hello World . Kód funkce potom publikujete do Azure. Funkce, kterou vytvoříte, orchestruje a zřetědí volání dalších funkcí.
Publikace
Durable Functions se vyvíjí ve spolupráci s Microsoft Research. V důsledku toho tým Durable Functions aktivně vytváří výzkumné studie a artefakty; mezi ně patří:
- Durable Functions: Sémantika pro stavové bezserverové aplikace(OOPSLA'21)
- Bezserverové pracovní postupy s Durable Functions a Netherite(preprint)
Ukázka videa
Následující video ukazuje výhody Durable Functions:
Další možnosti orchestrace
Durable Functions je rozšířené rozšíření pro Azure Functions a nemusí být vhodné pro všechny aplikace. Porovnání s jinými technologiemi orchestrace Azure najdete v tématu Porovnání Azure Functions a Azure Logic Apps.