Delen via


Wat zijn Durable Functions?

Durable Functions is een functie van Azure Functions waarmee u stateful functies kunt schrijven in een serverloze rekenomgeving. Met de extensie kunt u stateful werkstromen definiëren door Orchestrator-functies te schrijven en stateful entiteiten door entiteitsfuncties te schrijven met behulp van het Azure Functions-programmeermodel. Achter de schermen beheert de extensie status, controlepunten en het opnieuw opstarten, zodat u zich kunt concentreren op uw bedrijf.

Ondersteunde talen

Durable Functions is ontworpen om te werken met alle Azure Functions-programmeertalen, maar heeft mogelijk verschillende minimale vereisten voor elke taal. In de volgende tabel ziet u de minimaal ondersteunde app-configuraties:

Taalstack Runtimeversies van Azure Functions Versie voor taalarbeider Minimale bundelversie
.NET / C# / F# Functions 1.0+ In-proces
buiten het proces
n.v.t.
JavaScript/TypeScript (v3 prog. model) Functions 2.0+ Node 8+ 2.x bundels
JavaScript/TypeScript (v4 prog. model) Functions 4.25+ Node 18+ 3.15+ bundels
Python Functions 2.0+ Python 3.7+ 2.x bundels
Python (v2 prog. model) Functions 4.0+ Python 3.7+ 3.15+ bundels
Powershell Functions 3.0+ PowerShell 7+ 2.x bundels
Java Functions 4.0+ Java 8+ 4.x bundels

Belangrijk

In dit artikel worden tabbladen gebruikt ter ondersteuning van meerdere versies van het Node.js programmeermodel. Het v4-model is algemeen beschikbaar en is ontworpen voor een flexibelere en intuïtievere ervaring voor JavaScript- en TypeScript-ontwikkelaars. Raadpleeg de ontwikkelaarshandleiding voor Azure Functions Node.js voor meer informatie over hoe het v4-model werkt. Raadpleeg de migratiehandleiding voor meer informatie over de verschillen tussen v3 en v4.

Belangrijk

In dit artikel worden tabbladen gebruikt ter ondersteuning van meerdere versies van het Python-programmeermodel. Het v2-model is algemeen beschikbaar en is ontworpen om een meer codegerichte manier te bieden voor het ontwerpen van functies via decorators. Zie de Ontwikkelaarshandleiding voor Azure Functions Python voor meer informatie over het v2-model.

Net als Azure Functions zijn er sjablonen voor het ontwikkelen van Durable Functions met behulp van Visual Studio, Visual Studio Code en Azure Portal.

Toepassingspatronen

Het primaire toepassingsgeval voor Durable Functions is het vereenvoudigen van complexe vereisten voor coördinatie met status in serverloze applicaties. In de volgende secties worden enkele typische toepassingspatronen beschreven die kunnen profiteren van Durable Functions:

Patroon 1: Functiekoppeling

In het patroon voor functiekoppeling wordt een reeks functies in een specifieke volg orde uitgevoerd. In dit patroon wordt de uitvoer van een functie toegepast op de invoer van een andere functie. Het gebruik van wachtrijen tussen elke functie zorgt ervoor dat het systeem duurzaam en schaalbaar blijft, ook al is er een controlestroom van de ene functie naar de volgende.

Diagram van het patroon voor functiekoppeling.

U kun Durable Functions gebruiken om het patroon voor functiekoppeling bondig te implementeren zoals in het volgende voorbeeld.

In dit voorbeeld zijn de waarden F1, F2, F3 en F4 de namen van andere functies in dezelfde functie-app. U kunt een controlestroom implementeren met behulp van normale imperatieve codeconstructies. Code wordt van boven naar beneden uitgevoerd. De code kan bestaande taalsemantiek voor controlestromen bevatten, zoals voorwaarden en lussen. U kunt logica voor foutafhandeling toevoegen in try/catch/finally blokken.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

U kunt de context-parameter gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Telkens de code await aanroept controleert het Durable Functions-framework de voortgang van de huidige functie-instantie. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande await-aanroep. Zie de volgende sectie Pattern #2: Fan-out/fan-in voor meer informatie.

In dit voorbeeld zijn de waarden F1, F2, F3 en F4 de namen van andere functies in dezelfde functie-app. U kunt een controlestroom implementeren met behulp van normale imperatieve codeconstructies. Code wordt van boven naar beneden uitgevoerd. De code kan bestaande taalsemantiek voor controlestromen bevatten, zoals voorwaarden en lussen. U kunt logica voor foutafhandeling toevoegen in try/catch/finally blokken.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

U kunt het context.df-object gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Telkens de code yield aanroept controleert het Durable Functions-framework de voortgang van de huidige functie-instantie. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande yield-aanroep. Zie de volgende sectie Pattern #2: Fan-out/fan-in voor meer informatie.

Notitie

Het context-object in Javascript vertegenwoordigt de volledige functiecontext. Open de Durable Functions-context met behulp van de eigenschap df in de hoofdcontext.

In dit voorbeeld zijn de waarden F1, F2, F3 en F4 de namen van andere functies in dezelfde functie-app. U kunt een controlestroom implementeren met behulp van normale imperatieve codeconstructies. Code wordt van boven naar beneden uitgevoerd. De code kan bestaande taalsemantiek voor controlestromen bevatten, zoals voorwaarden en lussen.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

U kunt het context-object gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Telkens de code yield aanroept controleert het Durable Functions-framework de voortgang van de huidige functie-instantie. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande yield-aanroep. Zie de volgende sectie Pattern #2: Fan-out/fan-in voor meer informatie.

Notitie

Het context-object in Python vertegenwoordigt de orkestratiecontext. Krijg toegang tot de hoofdcontext van Azure Functions via de eigenschap function_context in de orkestratie-context.

In dit voorbeeld zijn de waarden F1, F2, F3 en F4 de namen van andere functies in dezelfde functie-app. U kunt een controlestroom implementeren met behulp van normale imperatieve codeconstructies. Code wordt van boven naar beneden uitgevoerd. De code kan bestaande taalsemantiek voor controlestromen bevatten, zoals voorwaarden en lussen.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

U kunt de opdracht Invoke-DurableActivity gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. Elke keer dat de code Invoke-DurableActivity aanroept zonder de schakelaar NoWait, legt het Durable Functions-framework een controlepunt van de voortgang van de huidige functie-instantie vast. Als het proces of de virtuele machine halverwege tijdens de uitvoering recycleert, dan begint de functie-instantie opnieuw vanaf de voorgaande Invoke-DurableActivity-aanroep. Zie de volgende sectie Pattern #2: Fan-out/fan-in voor meer informatie.

In dit voorbeeld zijn de waarden F1, F2, F3 en F4 de namen van andere functies in dezelfde functie-app. U kunt een controlestroom implementeren met behulp van normale imperatieve codeconstructies. Code wordt van boven naar beneden uitgevoerd. De code kan bestaande taalsemantiek voor controlestromen bevatten, zoals voorwaarden en lussen.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

U kunt het ctx-object gebruiken om andere functies aan te roepen op naam, parameters door te geven en functie-uitvoer te retourneren. De uitvoer van deze methoden is een Task<V> object waarin het type gegevens is dat V wordt geretourneerd door de aangeroepen functie. Telkens wanneer u aanroept Task<V>.await(), controleert het Durable Functions-framework de voortgang van het huidige functie-exemplaar. Als het proces onverwacht halverwege de uitvoering opnieuw wordt gestart, wordt het functie-exemplaar hervat vanaf de voorgaande Task<V>.await() aanroep. Zie de volgende sectie Pattern #2: Fan-out/fan-in voor meer informatie.

Patroon 2: Fan-out/fan-in

In het fan-out/fan-in patroon voert u meerdere functies parallel uit en wacht u totdat alle functies zijn voltooid. Vaak wordt er een aggregatie toegepast op de resultaten die worden geretourneerd door de functies.

Diagram van het fan-out fan-in patroon.

Bij normale functies kun je uitwaaieren door de functie meerdere berichten naar een wachtrij te laten sturen. Terug inwaaieren is veel lastiger. Om in een normale functie in te waaieren schrijft u code om op te volgen wanneer de in de wachtrij geactiveerde functies aflopen en slaat u vervolgens de functie-uitvoer op.

De Durable Functions-extensie verwerkt dit patroon met relatief eenvoudige code:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Het fan-out proces wordt verdeeld over meerdere instanties van de F2 functie. Het werk wordt opgevolgd met een dynamische takenlijst. Task.WhenAll wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3-functie.

Het automatisch plaatsen van controlepunten bij de await-aanroep op Task.WhenAll zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Het fan-out werk wordt verdeeld over meerdere instanties van de F2-functie. Het werk wordt opgevolgd met een dynamische takenlijst. context.df.Task.all-API wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3-functie.

Het automatisch plaatsen van controlepunten bij de yield-aanroep op context.df.Task.all zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Het uitwaaierwerk wordt verdeeld over meerdere instanties van de F2-functie. Het werk wordt opgevolgd met een dynamische takenlijst. context.task_all-API wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3-functie.

Het automatisch plaatsen van controlepunten bij de yield-aanroep op context.task_all zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Het fan-out werk wordt verdeeld over meerdere instanties van de F2-functie. Let op het gebruik van de NoWait switch bij het aanroepen van de F2 functie: met deze switch kan de orkestrator doorgaan met aanroepen van F2 zonder te wachten op voltooiing van de activiteit. Het werk wordt opgevolgd met een dynamische takenlijst. De opdracht Wait-ActivityFunction wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens wordt de F2-functie-uitvoer samengevoegd uit de dynamische takenlijst en doorgegeven naar de F3-functie.

Het automatisch plaatsen van controlepunten bij de Wait-ActivityFunction-aanroep zorgt ervoor dat een reeds voltooide taak niet opnieuw moet worden opgestart bij een potentiële crash of reboot halverwege.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Het fan-out werk wordt verdeeld over meerdere instanties van de F2 functie. Het werk wordt opgevolgd met een dynamische takenlijst. ctx.allOf(parallelTasks).await() wordt aangeroepen om te wachten tot alle aangeroepen functies voltooid zijn. Vervolgens worden de uitvoer van de F2 functie samengevoegd vanuit de lijst met dynamische taken en geretourneerd als uitvoer van de orchestratorfunctie.

De automatische controlepunten die plaatsvinden bij het aanroepen van .await() op ctx.allOf(parallelTasks) zorgen ervoor dat een onverwachte procesherstart geen reeds voltooide taken opnieuw hoeft te starten.

Notitie

In zeldzame gevallen is het mogelijk dat er een crash optreedt binnen de tijdsperiode nadat een activiteitsfunctie wordt voltooid maar voordat de voltooiing wordt opgeslagen in de orkestratiegeschiedenis. Als dit gebeurt, wordt de activiteitsfunctie opnieuw uitgevoerd vanaf het begin nadat het proces is hersteld.

Patroon 3: Asynchrone HTTP-API's

Het asynchrone HTTP API-patroon biedt een oplossing voor het probleem van de coördinatie van langdurige bewerkingen met externe clients. Een gebruikelijke manier om dit patroon te implementeren is door de langdurige actie te laten activeren door een HTTP-eindpunt. Leid de client dan om naar een statuseindpunt dat de client bevraagt om te weten te komen wanneer de bewerking is voltooid.

Diagram met het HTTP-API-patroon.

Durable Functions biedt ingebouwde ondersteuning voor dit patroon, waardoor u minder complexe of zelfs helemaal geen code moet schrijven om te communiceren met de uitvoering van langdurige functies. In de quickstartvoorbeelden voor Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell en Java) ziet u bijvoorbeeld een eenvoudige REST-opdracht die u kunt gebruiken om nieuwe orchestratorfunctie-exemplaren te starten. Nadat een instantie is gestart, biedt de extensie webhook HTTP-API's die de status van de orchestrator-functie opvragen.

Het volgende voorbeeld geeft REST-opdrachten weer die een orchestrator starten en de status opvragen. Voor de duidelijkheid zijn bepaalde protocoldetails weggelaten uit het voorbeeld.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Aangezien de Durable Functions-runtime de status voor u beheert, moet u geen eigen mechanisme voor het opvolgen van de status meer implementeren.

De Durable Functions-extensie maakt ingebouwde HTTP API's beschikbaar die langlopende orkestraties beheren. U kunt dit patroon ook zelf implementeren met behulp van uw eigen functietriggers (zoals HTTP, een wachtrij of Azure Event Hubs) en de duurzame clientbinding. Zo kunt u bijvoorbeeld een wachtrijbericht gebruiken om beëindiging te activeren. U kunt ook een HTTP-trigger gebruiken die wordt beveiligd door een Microsoft Entra-verificatiebeleid in plaats van de ingebouwde HTTP-API's die gebruikmaken van een gegenereerde sleutel voor verificatie.

Lees het artikel HTTP-functies voor meer informatie over hoe u asynchrone, langdurende processen via HTTP kunt weergeven met behulp van de Durable Functions-extensie.

Patroon 4: Monitoren

Het monitorpatroon verwijst naar een flexibel, terugkerend proces in een werkstroom. Een voorbeeld daarvan is peilen totdat er aan specifieke voorwaarden is voldaan. U kunt een gewone timertrigger gebruiken voor een basisscenario, zoals een periodieke opschoningstaak, maar het interval is statisch en het beheer van de levensduur wordt complex. U kunt Durable Functions gebruiken om flexibele, terugkerende intervallen te creëren, de levensduur van taken te beheren en meerdere bewakingsprocessen maken vanuit één indeling.

Een voorbeeld van het bewakingspatroon is om het eerdere asynchrone HTTP API-scenario om te keren. In plaats van een eindpunt beschikbaar te maken waarmee een externe client een langdurige bewerking bewaakt, gebruikt de monitor voor langdurige bewaking een extern eindpunt en wacht deze tot de status verandert.

Diagram met het monitorpatroon.

Met enkele regels code kunt u Durable Functions gebruiken om meerdere bewakingen te maken die willekeurige eindpunten observeren. De monitoren kunnen de uitvoering stopzetten wanneer er aan een voorwaarde is voldaan, of een andere functie kan de duurzame orkestratieclient gebruiken om de monitoren te beëindigen. U kunt het wait-interval van een monitor veranderen op basis van een specifieke voorwaarde (bijvoorbeeld exponentiële terugslag).

Met de volgende code wordt een standaardbewaking geïmplementeerd:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Wanneer een verzoek is ontvangen, wordt er een nieuwe orkestratie-instantie gemaakt voor die taak-ID. De instantie peilt een status totdat aan een voorwaarde wordt voldaan of totdat een time-out verloopt. Een duurzame timer controleert het polling-interval. Vervolgens kan er meer werk worden uitgevoerd of kan de orkestratie beëindigd worden.

Patroon 5: Menselijke interactie

Veel geautomatiseerde processen bevatten een menselijke tussenkomst. Mensen betrekken bij een geautomatiseerd proces is lastig, want mensen zijn niet zo beschikbaar en responsief als cloudservices. Een geautomatiseerd proces kan deze interactie mogelijk toestaan met behulp van time-outs en compensatielogica.

Een goedkeuringsproces is een voorbeeld van een bedrijfsproces waar menselijke tussenkomst aan te pas komt. Zo kan de goedkeuring van een manager vereist zijn voor een onkostennota die een bepaald bedrag overschrijdt. Als de manager de onkostendeclaratie niet binnen 72 uur goedkeurt (de manager is misschien op vakantie gegaan), wordt een escalatieproces gestart om de goedkeuring van iemand anders te krijgen (misschien de manager van de manager).

Diagram van het menselijke interactiepatroon.

U kunt het patroon in dit voorbeeld implementeren met behulp van een orchestrator-functie. De orchestrator gebruikt een duurzame timer om goedkeuring te vragen. De orkestrator escaleert als er een timeout plaatsvindt. De orchestrator wacht op een externe gebeurtenis zoals een melding die gegenereerd wordt door een menselijke tussenkomst.

In deze voorbeelden wordt een goedkeuringsproces gemaakt om het menselijke tussenkomstpatroon te demonstreren:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Roep context.CreateTimer aan om de duurzame timer te maken. De melding wordt ontvangen door context.WaitForExternalEvent. Task.WhenAny wordt vervolgens aangeroepen om te bepalen of er een escalatie moet plaatsvinden (wanneer de time-out eerst gebeurt) of dat de goedkeuring moet worden verwerkt (wanneer de goedkeuring vóór de time-out wordt ontvangen).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Roep context.df.createTimer aan om de duurzame timer te maken. De melding wordt ontvangen door context.df.waitForExternalEvent. Vervolgens wordt context.df.Task.any aangeroepen om te bepalen of we moeten escaleren (de time-out gebeurt eerst) of de goedkeuring moeten verwerken (de goedkeuring wordt vóór de time-out ontvangen).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Roep context.create_timer aan om de duurzame timer te maken. De melding wordt ontvangen door context.wait_for_external_event. Vervolgens wordt context.task_any aangeroepen om te bepalen of er een escalatie moet plaatsvinden (de time-out eerst plaatsvindt) of dat de goedkeuring moet worden verwerkt (de goedkeuring wordt namelijk vóór de time-out ontvangen).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Roep Start-DurableTimer aan om de duurzame timer te maken. De melding wordt ontvangen door Start-DurableExternalEventListener. Wait-DurableTask Vervolgens wordt aangeroepen om te bepalen of er een escalatie moet plaatsvinden (er treedt eerst een time-out op) of wordt de goedkeuring verwerkt (de goedkeuring wordt vóór time-out ontvangen).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Met de methodeaanroep ctx.waitForExternalEvent(...).await() wordt de orkestratie onderbroken totdat er een gebeurtenis met de naam ApprovalEvent wordt ontvangen, die een boolean payload heeft. Als de gebeurtenis wordt ontvangen, wordt een activiteitsfunctie aangeroepen om het goedkeuringsresultaat te verwerken. Als een dergelijke gebeurtenis echter niet wordt ontvangen voordat de timeout (72 uur) verloopt, wordt er een TaskCanceledException gegenereerd en wordt de Escalate activiteitsfunctie aangeroepen.

Notitie

Er worden geen kosten in rekening gebracht voor de tijd die is besteed aan het wachten op externe gebeurtenissen bij het uitvoeren van het verbruiksabonnement.

Een externe client kan de gebeurtenismelding afleveren aan een wachtende orchestrator-functie met behulp van de ingebouwde HTTP API's:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

In dezelfde functie-app kan een gebeurtenis ook worden gegenereerd met behulp van de durable orchestration client uit een andere functie.

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Patroon 6: Aggregator (status houdende entiteiten)

Het zesde patroon is het samenvoegen van gebeurtenisgegevens gedurende een bepaalde periode in één enkele, adresseerbare entiteit. In dit patroon kunnen de gegevens die worden samengevoegd afkomstig zijn van meerdere bronnen, in batches worden geleverd of gedurende lange tijd worden verspreid. De aggregator moet mogelijk actie ondernemen op gebeurtenisgegevens wanneer deze binnenkomen en externe clients moeten mogelijk query's uitvoeren op de geaggregeerde gegevens.

Diagram dat een aggregator toont.

Wat de implementatie van dit patroon met normale, staatloze functies lastig maakt is dat het beheer van gelijktijdigheid erg moeilijk wordt. U hoeft zich niet alleen zorgen te maken over meerdere threads die dezelfde gegevens tegelijkertijd wijzigen, maar u moet zich ook zorgen maken dat de aggregator alleen op één virtuele machine tegelijk wordt uitgevoerd.

U kunt Duurzame entiteiten gebruiken om dit patroon eenvoudig als een enkele functie te implementeren.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Duurzame entiteiten kunnen ook gemodelleerd worden als klassen in .NET. Dit model kan handig zijn als de lijst met bewerkingen vaststaat en groot wordt. Het volgende voorbeeld is een equivalente implementatie van de Counter-entiteit met .NET-klassen en -methodes.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Notitie

Duurzame entiteiten worden momenteel niet ondersteund in PowerShell.

Notitie

Duurzame entiteiten worden momenteel niet ondersteund in Java.

Clients kunnen bewerkingen voor een entiteitsfunctie (ook wel signalering genoemd) in de wachtrij zetten met behulp van de entiteitsclientbinding.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Notitie

Dynamisch gegenereerde proxy's zijn ook beschikbaar in .NET om entiteiten op typebeveiligde wijze te signaleren. Naast signaleren kunnen clients ook de status van een entiteitsfunctie opvragen met typesafe methodes op de orchestratieclientbinding.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Entiteitsfuncties zijn beschikbaar in Durable Functions 2.0 en hoger voor C#, JavaScript en Python.

De technologie

Achter de schermen is de Durable Functions-extensie gebouwd op het Durable Task Framework, een opensource-bibliotheek in GitHub die gebruikt wordt om werkstromen te bouwen in code. Net zoals Azure Functions de serverloze evolutie van Azure WebJobs is, is Durable Functions de serverloze evolutie van het Durable Task Framework. Microsoft en andere organisaties gebruiken het Durable Task Framework om bedrijfskritieke processen te automatiseren. Het sluit naadloos aan op de serverloze Azure Functions-omgeving.

Codebeperkingen

Om betrouwbare en langdurige uitvoeringsgaranties te bieden, beschikken orchestrator-functies over een set coderegels die gevolg moeten worden. Zie Codebeperkingen voor Orchestrator-functies voor meer informatie.

Facturering

Durable Functions wordt gefactureerd op dezelfde wijze als Azure Functions. Zie Prijzen voor Azure Functions voor meer informatie. Wanneer u orchestratorfuncties uitvoert in een Azure Functions Consumption-abonnement, moet u rekening houden met enkele factureringsgedragen. Zie het artikel Durable Functions-facturering voor meer informatie hierover.

Duik er meteen in

U kunt in minder dan 10 minuten aan de slag gaan met Durable Functions door een van deze taalspecifieke quickstart-zelfstudies te volgen:

In deze quickstarts maakt en test u lokaal een duurzame Hallo wereld-functie . Vervolgens publiceert u de functiecode op Azure. De functie die u maakt, coördineert en koppelt aanroepen naar andere functies.

Publicaties

Durable Functions is ontwikkeld in samenwerking met Microsoft Research. Hierdoor produceert het Durable Functions-team actief onderzoeksdocumenten en artefacten; dit zijn onder andere:

Videodemo

De volgende video laat de voordelen van Durable Functions zien:

Andere orkestratieopties

Durable Functions is een geavanceerde extensie voor Azure Functions en is mogelijk niet geschikt voor alle toepassingen. Zie voor een vergelijking met andere Azure-orkestratietechnologieën Compare Azure Functions and Azure Logic Apps.