Sdílet prostřednictvím


Co je Durable Functions?

Durable Functions je funkce Azure Functions, která umožňuje psát stavové funkce v bezserverovém výpočetním prostředí. Rozšíření umožňuje definovat stavové pracovní postupy tím, že pomocí programovacího modelu Azure Functions napíšete funkce orchestrátoru a stavové entity. Rozšíření v zákulisí spravuje stav, kontrolní body a restartování, což vám umožňuje soustředit se na obchodní logiku.

Podporované jazyky

Durable Functions je navržený tak, aby fungoval se všemi programovacími jazyky Azure Functions, ale pro každý jazyk může mít jiné minimální požadavky. Následující tabulka uvádí minimální podporované konfigurace aplikací:

Zásobník jazyků Verze modulu runtime Azure Functions Verze jazykového pracovníka Minimální verze sad
.NET / C# / F# Funkce 1.0+ Probíhá proces
Mimoprocesové
Není k dispozici
JavaScript/TypeScript (programovací model verze 3) Funkce 2.0+ Node.js 8 a novější Balíčky 2.x
JavaScript/TypeScript (programovací model v4) Funkce 4.25 a novější Node.js ve verzi 18 nebo novější 3.15+ sady
Python Funkce 2.0+ Python 3.7 nebo novější Balíčky 2.x
Python (v2 programovací model) Funkce 4.0 a novější Python 3.7 nebo novější 3.15+ sady
PowerShell Funkce 3.0 a novější PowerShell 7 a novější Sady 2.x
Java Funkce 4.0 a novější Java 8 a novější Balíčky 4.x

Důležité

Tento článek používá karty pro podporu více verzí programovacího modelu Node.js. Model v4 je obecně dostupný a je navržený tak, aby měl flexibilnější a intuitivnější prostředí pro vývojáře v JavaScriptu a TypeScriptu. Další podrobnosti o tom, jak model v4 funguje, najdete v příručce pro vývojáře služby Azure Functions Node.js. Další informace o rozdílech mezi v3 a v4 najdete v průvodci migrací.

Důležité

Tento článek používá karty pro podporu více verzí programovacího modelu Pythonu. Model v2 je obecně dostupný a je navržený tak, aby poskytoval způsob vytváření funkcí, který je více zaměřený na kód, prostřednictvím dekorátorů. Další informace o modelu v2 najdete v příručce pro vývojáře v Pythonu pro Azure Functions.

Podobně jako Azure Functions existují šablony, které vám pomůžou vyvíjet Durable Functions pomocí sady Visual Studio, Visual Studio Code a webu Azure Portal.

Vzory aplikací

Primárním případem použití Durable Functions je zjednodušení složitých a stavových požadavků koordinace v bezserverových aplikacích. Následující části popisují typické vzory aplikací, které můžou využívat Durable Functions:

Pattern č. 1: Řetězení funkcí

V modelu zřetězení funkcí se posloupnost funkcí provede v určitém pořadí. V tomto vzoru se výstup jedné funkce použije na vstup jiné funkce. Použití front mezi jednotlivými funkcemi zajišťuje, že systém zůstane trvalý a škálovatelný, i když existuje tok řízení z jedné funkce do další.

Diagram modelu zřetězování funkcí

Durable Functions můžete použít k implementaci modelu řetězení funkcí stručně, jak je znázorněno v následujícím příkladu.

V tomto příkladu jsou hodnoty F1, F2, F3a F4 jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat sémantiku toku řízení jazyka, jako jsou podmíněné výrazy a smyčky. Logiku zpracování chyb můžete zahrnout do try/catch/finally bloků.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Tento parametr context můžete použít k vyvolání dalších funkcí podle jména, předání parametrů a návratu výstupu funkce. Pokaždé, když kód volá await, Durable Functions framework kontrolní body průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího await volání. Další informace najdete v následující části: Vzor č. 2: Fan-out/fan-in.

V tomto příkladu jsou hodnoty F1, F2, F3a F4 jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat existující řídicí struktury jazyka, jako jsou podmínky a smyčky. Logiku zpracování chyb můžete zahrnout do try/catch/finally bloků.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Objekt context.df můžete použít k volání dalších funkcí podle jejich názvu, předávání parametrů a získání výstupu funkce. Pokaždé, když kód volá yield, framework Durable Functions vytvoří kontrolní bod pro zaznamenání průběhu aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield volání. Další informace najdete v další části Vzor č. 2: Fan-out/fan-in.

Poznámka:

Objekt context v JavaScriptu představuje celý kontext funkce. Přístup k kontextu Durable Functions pomocí df vlastnosti v hlavním kontextu.

V tomto příkladu jsou hodnoty F1, F2, F3a F4 jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat řídící struktury jazyka, jako jsou podmínky a cykly.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

Objekt context můžete použít k vyvolání dalších funkcí podle názvu, předání parametrů a získání výstupu funkce. Pokaždé, když kód volá yield, rámec Durable Functions připraví kontrolní bod postupu aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího yield volání. Další informace najdete v další kapitole, Vzor č. 2: Fan-out/fan-in.

Poznámka:

Objekt context v Pythonu představuje kontext orchestrace. Přístup k hlavnímu function_context kontextu Azure Functions pomocí vlastnosti v kontextu orchestrace.

V tomto příkladu jsou hodnoty F1, F2, F3a F4 jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může zahrnovat stávající principy řízení toku programu, jako jsou podmínky a cykly.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Příkaz můžete použít Invoke-DurableActivity k vyvolání dalších funkcí podle názvu, předání parametrů a výstupu návratové funkce. Pokaždé, když kód volá Invoke-DurableActivity bez NoWait přepínače, architektura Durable Functions zkontroluje průběh aktuální instance funkce. Pokud proces nebo virtuální počítač recykluje uprostřed provádění, instance funkce se obnoví z předchozího Invoke-DurableActivity volání. Další informace najdete v další sekci Vzor č. 2: Fan-out/fan-in.

V tomto příkladu jsou hodnoty F1, F2, F3a F4 jsou názvy jiných funkcí ve stejné aplikaci funkcí. Tok řízení můžete implementovat pomocí normálních imperativních konstruktorů kódování. Kód se spustí shora dolů. Kód může obsahovat sémantiku řízení toku programu, jako jsou podmínky a smyčky.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Objekt ctx můžete použít k vyvolání dalších funkcí podle názvu, pro předání parametrů a vrácení výstupu funkce. Výstup těchto metod je objekt, Task<V> kde V je typ dat vrácený vyvolanou funkcí. Pokaždé, když zavoláte Task<V>.await(), framework Durable Functions ukládá kontrolní body pro průběh aktuální instance funkce. Pokud proces neočekávaně recykluje uprostřed provádění, instance funkce se obnoví z předchozího Task<V>.await() volání. Další informace najdete v následující sekci Vzor č. 2: Fan-out/fan-in.

Vzor č. 2: Fan-out/fan-in

Ve vzoru ventilátoru/ventilátoru spustíte paralelně několik funkcí a pak počkáte na dokončení všech funkcí. U výsledků vrácených z funkcí se často provádí určitá agregační práce.

Diagram vzoru fan-out fan-in

Díky normálním funkcím můžete posílat více zpráv do fronty. Zapojování zpět je mnohem náročnější. Pokud chcete implementovat „fan in“ ve funkci, napíšete kód, který sleduje, kdy končí funkce aktivované frontou, a potom uložíte jejich výstupy.

Rozšíření Durable Functions zpracovává tento vzor s relativně jednoduchým kódem:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Práce rozvětvení je distribuována do několika instancí funkce F2. Práce se sleduje pomocí dynamického seznamu úkolů. Task.WhenAll je volána k čekání na dokončení všech volaných funkcí. Výstupy funkce F2 se pak agregují ze seznamu dynamických úkolů a předají se funkci F3.

Automatické vytváření kontrolních bodů, které se děje při await volání na Task.WhenAll, zajišťuje, že případná havárie nebo restart během procesu nevyžaduje opětovné spuštění již dokončené úlohy.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Práce fan-out je distribuována do několika instancí funkce F2. Práce se sleduje pomocí dynamického seznamu úkolů. context.df.Task.all Rozhraní API se volá, aby čekalo na dokončení všech volaných funkcí. Pak se výstupy funkce F2 agregují ze seznamu dynamických úkolů a předají se funkci F3.

Automatické vytváření kontrolních bodů, které nastává při volání yield na context.df.Task.all, zajišťuje, že nenadálý pád systému nebo restartování v průběhu nevyžaduje opětovné spuštění již dokončené úlohy.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Pracovní rozdělení je distribuováno do několika instancí funkce F2. Práce se sleduje pomocí dynamického seznamu úkolů. context.task_all Rozhraní API se volá, aby počkalo na dokončení všech volaných funkcí. F2 Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3.

Automatické vytváření kontrolních bodů, které se děje při yield volání, context.task_all zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Práce fan-out je rozdělena do několika instancí funkce F2. Všimněte si použití přepínače NoWait při vyvolání funkce F2: tento přepínač umožňuje orchestrátoru pokračovat ve volání F2 bez čekání na dokončení činnosti. Práce se sleduje pomocí dynamického seznamu úkolů. Příkaz Wait-ActivityFunction se volá, aby počkal, až se dokončí všechny volané funkce. F2 Výstupy funkce se pak agregují ze seznamu dynamických úkolů a předají funkciF3.

Automatické vytváření kontrolních bodů, které se děje při Wait-ActivityFunction volání, zajišťuje, že potenciální chybové ukončení nebo restartování uprostřed cesty nevyžaduje restartování již dokončené úlohy.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Rozvětvená práce je distribuována do několika instancí funkce F2. Práce se sleduje pomocí dynamického seznamu úkolů. ctx.allOf(parallelTasks).await() je volán, aby vyčkal na dokončení všech volaných funkcí. F2 Výstupy funkce se pak agregují ze seznamu dynamických úloh a vrátí se jako výstup funkce orchestrátoru.

Automatické vytváření kontrolních bodů se děje při .await() volání, ctx.allOf(parallelTasks) zajišťuje, že neočekávané recyklování procesu nevyžaduje restartování již dokončených úkolů.

Poznámka:

Ve výjimečných případech je možné, že po dokončení funkce aktivity dojde k chybovému ukončení v okně, ale před uložením jeho dokončení do historie orchestrace. Pokud k tomu dojde, funkce aktivity se po obnovení procesu znovu spustí od začátku.

Vzor č. 3: Asynchronní rozhraní HTTP API

Asynchronní vzor rozhraní HTTP API řeší problém koordinace stavu dlouhotrvajících operací s externími klienty. Běžným způsobem implementace tohoto modelu je, že koncový bod HTTP aktivuje dlouhotrvající akci. Pak přesměrujte klienta na koncový bod stavu, který klient dotazuje, až se operace dokončí.

Diagram znázorňující vzor rozhraní HTTP API

Durable Functions poskytuje integrovanou podporu pro tento vzor, zjednodušuje nebo dokonce odebírá kód, který potřebujete k zápisu pro interakci s dlouhotrvajícími spouštěními funkcí. Ukázky rychlého startu Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell a Java) ukazují jednoduchý příkaz REST, který můžete použít ke spuštění nových instancí funkcí orchestrátoru. Po spuštění instance rozšíření zveřejňuje rozhraní API HTTP webhooku, která se dotazují na stav funkce orchestrátoru.

Následující příklad ukazuje příkazy REST, které spouští orchestrátor a dotazuje se na jeho stav. Kvůli přehlednosti jsou některé podrobnosti protokolu z příkladu vynechány.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Vzhledem k tomu, že modul runtime Durable Functions spravuje stav za vás, nemusíte implementovat vlastní mechanismus sledování stavu.

Rozšíření Durable Functions zveřejňuje integrovaná rozhraní API HTTP, která spravují dlouhotrvající orchestrace. Tento vzor můžete implementovat sami pomocí vlastních triggerů funkcí (například HTTP, fronty nebo služby Azure Event Hubs) a trvalé vazby klienta. K aktivaci ukončení můžete například použít zprávu fronty. Nebo můžete použít trigger HTTP, který je chráněný zásadami ověřování Microsoft Entra místo integrovaných rozhraní HTTP API, která k ověřování používají vygenerovaný klíč.

Další informace najdete v článku o funkcích HTTP, který vysvětluje, jak můžete zveřejnit asynchronní dlouhotrvající procesy přes HTTP pomocí rozšíření Durable Functions.

Vzor č. 4: Monitor

Vzor monitorování odkazuje na flexibilní opakující se proces v pracovním postupu. Příkladem je dotazování, dokud nebudou splněny konkrétní podmínky. Pomocí běžného triggeru časovače můžete řešit základní scénář, jako je například pravidelná úloha čištění, ale jeho interval je statický a správa životností instancí je složitá. Durable Functions můžete použít k vytváření flexibilních intervalů opakování, správě životností úloh a vytváření více procesů monitorování z jedné orchestrace.

Příkladem modelu monitorování je obrácení předchozího asynchronního scénáře rozhraní HTTP API. Místo vystavení koncového bodu pro externího klienta k monitorování dlouhotrvající operace využívá dlouhotrvající monitor externí koncový bod a čeká na změnu stavu.

Diagram znázorňující vzor monitorování

V několika řádcích kódu můžete pomocí Durable Functions vytvořit několik monitorů, které sledují libovolné koncové body. Monitorování můžou ukončit provádění, pokud je splněna podmínka, nebo jiná funkce může k ukončení monitorování použít trvalý orchestrační klient. Interval monitorování wait můžete změnit na základě konkrétní podmínky (například exponenciální zpoždnění).)

Následující kód implementuje základní monitorování:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Po přijetí požadavku se pro toto ID úlohy vytvoří nová instance orchestrace. Instance opakovaně kontroluje stav, dokud není splněna podmínka, nebo dokud nevyprší časový limit. Trvalý časovač řídí interval dotazování. Poté lze provést více práce, nebo orchestrace může být ukončena.

Vzor č. 5: Lidská interakce

Mnoho automatizovaných procesů zahrnuje určitý druh lidské interakce. Zapojení lidí do automatizovaného procesu je složité, protože lidé nejsou tak vysoce dostupní ani tak pohotově reagující jako cloudové služby. Automatizovaný proces může této interakci umožnit pomocí časových limitů a logiky kompenzace.

Proces schvalování je příkladem obchodního procesu, který zahrnuje lidskou interakci. Schválení manažera může být vyžadováno pro sestavu výdajů, která překračuje určitou částku v dolarech. Pokud manažer zprávu o výdajích neschválí do 72 hodin (může jít o manažera na dovolenou), zahájí se proces eskalace, který získá schválení od někoho jiného (možná manažera).

Diagram vzoru lidské interakce

Vzor v tomto příkladu můžete implementovat pomocí funkce orchestrátoru. Orchestrátor používá trvalý časovač k vyžádání schválení. Orchestrátor eskaluje, pokud dojde k vypršení časového limitu. Orchestrátor čeká na externí událost, například na oznámení vygenerované lidskou interakcí.

Tyto příklady vytvoří schvalovací proces, který předvede vzor lidské interakce:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Chcete-li vytvořit trvalý časovač, zavolejte context.CreateTimer. Oznámení obdrží context.WaitForExternalEvent. Potom je Task.WhenAny zavolána pro rozhodnutí, zda eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Chcete-li vytvořit trvalý časovač, zavolejte context.df.createTimer. Oznámení obdrží context.df.waitForExternalEvent. Potom se zavolá k rozhodnutí, context.df.Task.any jestli se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Chcete-li vytvořit trvalý časovač, volejte context.create_timer. Oznámení obdrží context.wait_for_external_event. Potom je vyvolána pro rozhodnutí, context.task_any zda se má eskalovat (dojde k vypršení časového limitu), nebo zpracovat schválení (schválení se obdrží před vypršením časového limitu).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Chcete-li vytvořit trvalý časovač, volejte Start-DurableTimer. Oznámení obdrží Start-DurableExternalEventListener. Pak se zavolá Wait-DurableTask, aby se rozhodlo, jestli se má eskalovat (pokud nejdříve vyprší časový limit), nebo zpracovat schválení (pokud se schválení obdrží před vypršením časového limitu).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Volání ctx.waitForExternalEvent(...).await() metody pozastaví orchestraci, dokud neobdrží událost s názvem ApprovalEvent, která má datový boolean obsah. Pokud je událost přijata, volá se funkce aktivity pro zpracování výsledku schválení. Pokud se však žádná taková událost nepřijala před vypršením timeout platnosti (72 hodin), TaskCanceledException vyvolá se a Escalate volá se funkce aktivity.

Poznámka:

Za dobu strávenou čekáním na externí události při spuštění v plánu Consumption se neúčtují žádné poplatky.

Externí klient může doručit oznámení události do funkce čekající orchestrátoru pomocí integrovaných rozhraní API HTTP:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Událost lze vyvolat také pomocí klienta trvalé orchestrace z jiné funkce ve stejné aplikaci funkcí:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Vzor č. 6: Agregátor (stavové entity)

Šestý vzor se týká agregace dat událostí v určitém časovém období do jedné adresovatelné entity. V tomto modelu můžou agregovaná data pocházet z více zdrojů, mohou být doručena v dávkách nebo mohou být rozptýlená po dlouhou dobu. Agregátor může potřebovat provést akci s daty událostí při jejich doručení a externí klienti se můžou muset dotazovat na agregovaná data.

Diagram znázorňující agregátor

Složitá věc, když se snažíte implementovat tento vzor s normálními bezstavovými funkcemi, je, že řízení souběžnosti se stává obrovskou výzvou. Nemusíte se starat jenom o několik vláken, které upravují stejná data současně, ale také se musíte starat o zajištění toho, aby agregátor běžel jenom na jednom virtuálním počítači najednou.

Durable entity můžete použít k snadné implementaci tohoto vzoru jako jedné funkce.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Odolné entity lze také modelovat jako třídy v .NET. Tento model může být užitečný, pokud je seznam operací pevný a bude velký. Následující příklad je ekvivalentní implementací Counter entity pomocí tříd a metod .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Poznámka:

Odolné entity se v PowerShellu v současné době nepodporují.

Poznámka:

Odolné entity nejsou v současné době v Javě podporovány.

Klienti mohou vytvořit frontu operací pro (označované také jako signalizační) funkci entity pomocí vazby klienta entity.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Poznámka:

Dynamicky generované proxy jsou v .NET také k dispozici pro signalizaci entit způsobem zajišťujícím bezpečnost typů. Kromě signalizace mohou klienti také dotazovat stav funkce entity pomocí metod bezpečných typů na vazbě klienta orchestrace.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Funkce entit jsou k dispozici v Durable Functions 2.0 a vyšší pro C#, JavaScript a Python.

Technologie

Rozšíření Durable Functions je v zákulisí postavené na platformě Durable Task Framework, což je open-source knihovna na GitHubu, která se používá k vytváření pracovních postupů v kódu. Stejně jako Azure Functions je bezserverový vývoj webových úloh Azure, Durable Functions je bezserverový vývoj architektury Durable Task Framework. Microsoft a další organizace používají architekturu Durable Task Framework k rozsáhlé automatizaci důležitých procesů. Je přirozeně vhodné pro bezserverové prostředí Azure Functions.

Omezení kódu

Aby bylo možné poskytovat spolehlivé a dlouhotrvající záruky provádění, mají funkce orchestrátoru sadu pravidel kódování, která je potřeba dodržovat. Další informace najdete v tématu Omezení kódu funkce orchestratoru.

Fakturace

Durable Functions se účtuje stejně jako Azure Functions. Další informace najdete v tématu Ceny služby Azure Functions. Při spouštění funkcí orchestrátoru v plánu Consumption služby Azure Functions existuje několik fakturačních chování, o nichž je potřeba vědět. Další informace o těchto chováních najdete v článku fakturace Durable Functions.

Ponořte se rovnou do toho

S Durable Functions můžete začít pracovat za méně než 10 minut dokončením některého z těchto kurzů rychlých startů pro konkrétní jazyk:

V těchto rychlých startech místně vytvoříte a otestujete odolnou funkci Hello World . Kód funkce potom publikujete do Azure. Funkce, kterou vytvoříte, orchestruje a zřetědí volání dalších funkcí.

Publikace

Durable Functions se vyvíjí ve spolupráci s Microsoft Research. V důsledku toho tým Durable Functions aktivně vytváří výzkumné studie a artefakty; mezi ně patří:

Ukázka videa

Následující video ukazuje výhody Durable Functions:

Další možnosti orchestrace

Durable Functions je rozšířené rozšíření pro Azure Functions a nemusí být vhodné pro všechny aplikace. Porovnání s jinými technologiemi orchestrace Azure najdete v tématu Porovnání Azure Functions a Azure Logic Apps.