Udostępnij za pośrednictwem


Co to są funkcje trwałe?

Durable Functions to funkcja usługi Azure Functions, która umożliwia pisanie funkcji stanowych w bezserwerowym środowisku obliczeniowym. Rozszerzenie umożliwia definiowanie stanowych przepływów pracy przez pisanie funkcji orkiestratora i jednostek stanowych przez pisanie funkcji jednostek przy użyciu modelu programowania usługi Azure Functions. W tle rozszerzenie zarządza stanem, punktami kontrolnymi i ponownymi uruchomieniami, co pozwala skupić się na logice biznesowej.

Obsługiwane języki

Rozszerzenie Durable Functions jest przeznaczone do pracy ze wszystkimi językami programowania usługi Azure Functions, ale może mieć różne minimalne wymagania dla każdego języka. W poniższej tabeli przedstawiono minimalne obsługiwane konfiguracje aplikacji:

Stos technologii językowych Wersje środowiska uruchomieniowego usługi Azure Functions Wersja dla pracowników językowych Minimalna wersja pakietów
.NET/C# /F# Funkcje w wersji 1.0 lub nowszej W toku
Poza procesem
nie dotyczy
JavaScript/TypeScript (model w wersji 3) Funkcje w wersji 2.0 lub nowszej Węzeł 8+ Pakiety 2.x
JavaScript/TypeScript (model programowania w wersji 4) Funkcje 4.25+ Węzeł 18+ 3.15+ pakiety
Python Funkcje w wersji 2.0 lub nowszej Python 3.7+ Pakiety 2.x
Python (model programowania wersja 2) Funkcje w wersji 4.0 lub nowszej Python 3.7+ 3.15+ pakiety
PowerShell Funkcje w wersji 3.0 lub nowszej PowerShell 7+ Pakiety 2.x
Java Funkcje w wersji 4.0 lub nowszej Java 8+ Pakiety 4.x

Ważne

W tym artykule używane są zakładki do obsługi wielu wersji modelu programowania Node.js. Model w wersji 4 jest ogólnie dostępny i ma bardziej elastyczne i intuicyjne środowisko dla deweloperów języka JavaScript i Języka TypeScript. Aby uzyskać więcej informacji na temat sposobu działania modelu w wersji 4, zapoznaj się z przewodnikiem dewelopera dotyczącym usługi Azure Functions Node.js. Aby dowiedzieć się więcej o różnicach między wersjami 3 i v4, zapoznaj się z przewodnikiem migracji.

Ważne

W tym artykule są używane karty do obsługi wielu wersji modelu programowania w języku Python. Model w wersji 2 jest ogólnie dostępny i ma na celu zapewnienie bardziej skoncentrowanego na kodzie sposobu tworzenia funkcji za pośrednictwem dekoratorów. Aby uzyskać więcej informacji na temat modelu w wersji 2, zobacz Przewodnik dla deweloperów języka Python usługi Azure Functions.

Podobnie jak Azure Functions, istnieją szablony ułatwiające opracowywanie Durable Functions przy użyciu programu Visual Studio, programu Visual Studio Code i portalu Azure.

Wzorce aplikacji

Podstawowym zastosowaniem rozszerzenia Durable Functions jest uproszczenie złożonych wymagań związanych z koordynacją stanową w aplikacjach bezserwerowych. W poniższych sekcjach opisano typowe wzorce aplikacji, które mogą korzystać z rozszerzenia Durable Functions:

Wzorzec nr 1: Tworzenie łańcucha funkcji

We wzorcu tworzenia łańcucha funkcji sekwencja funkcji jest wykonywana w określonej kolejności. W tym wzorcu dane wyjściowe jednej funkcji są stosowane do danych wejściowych innej funkcji. Użycie kolejek między każdą funkcją gwarantuje, że system pozostanie trwały i skalowalny, mimo że istnieje przepływ kontroli z jednej funkcji do następnej.

Diagram wzorca tworzenia łańcucha funkcji.

Za pomocą rozszerzenia Durable Functions można zaimplementować zwięzły wzorzec tworzenia łańcucha funkcji, jak pokazano w poniższym przykładzie.

W tym przykładzie wartości F1, F2, F3i F4 są nazwami innych funkcji w tej samej aplikacji funkcji. Przepływ sterowania można zaimplementować przy użyciu normalnych konstrukcji kodowania imperatywnego. Kod jest wykonywany z góry w dół. Kod może obejmować istniejącą semantykę przepływu sterowania w języku, na przykład instrukcje warunkowe i pętle. Logikę obsługi błędów można uwzględnić w try/catch/finally blokach.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Można użyć parametru context do wywoływania innych funkcji według nazwy, przekazywania parametrów i zwracania danych wyjściowych funkcji. Za każdym razem, gdy kod wywołuje await, platforma Durable Functions zapisuje postęp bieżącego wystąpienia funkcji. Jeśli proces lub maszyna wirtualna zostaną zrestartowane w trakcie wykonywania, wystąpienie funkcji zostanie wznowione od poprzedniego wywołania await. Aby uzyskać więcej informacji, zobacz następną sekcję Pattern #2: Fan-out/fan-in.

W tym przykładzie wartości F1, F2, F3i F4 są nazwami innych funkcji w tej samej aplikacji funkcji. Przepływ sterowania można zaimplementować przy użyciu normalnych konstrukcji kodowania imperatywnego. Kod jest wykonywany z góry w dół. Kod może obejmować istniejącą semantykę przepływu sterowania w języku, na przykład instrukcje warunkowe i pętle. Logikę obsługi błędów można uwzględnić w try/catch/finally blokach.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Możesz użyć obiektu context.df do wywoływania innych funkcji po nazwie, przekazywać parametry i zwracać dane wyjściowe funkcji. Za każdym razem, gdy kod wywołuje yield, platforma Durable Functions zapisuje stan postępu bieżącego wystąpienia funkcji. Jeśli proces lub maszyna wirtualna zostaną zresetowane w trakcie wykonywania, wystąpienie funkcji zostanie wznowione od poprzedniego wywołania yield. Aby uzyskać więcej informacji, przejrzyj następną sekcję Pattern #2: Fan-out/fan-in.

Uwaga

Obiekt context w języku JavaScript reprezentuje cały kontekst funkcji. Uzyskaj dostęp do kontekstu Durable Functions, korzystając z właściwości df w kontekście głównym.

W tym przykładzie wartości F1, F2, F3i F4 są nazwami innych funkcji w tej samej aplikacji funkcji. Przepływ sterowania można zaimplementować przy użyciu normalnych konstrukcji kodowania imperatywnego. Kod jest wykonywany z góry w dół. Kod może obejmować istniejącą semantykę przepływu sterowania języka, na przykład instrukcje warunkowe i pętle.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

Możesz użyć obiektu context do wywoływania innych funkcji według nazwy, przekazywania parametrów i zwracania wyniku funkcji. Za każdym razem, gdy kod wywołuje yield element, platforma Durable Functions rejestruje postęp wystąpienia bieżącej funkcji. Jeśli proces lub maszyna wirtualna zostaną zrestartowane w połowie wykonywania, wystąpienie funkcji zostanie wznowione od poprzedniego wywołania yield. Aby uzyskać więcej informacji, zobacz następną sekcję „Schemat #2: Fan-out/fan-in”.

Uwaga

context Obiekt w języku Python reprezentuje kontekst orkiestracji. Uzyskaj dostęp do głównego kontekstu Azure Functions, korzystając z właściwości function_context w kontekście orkiestracyjnym.

W tym przykładzie wartości F1, F2, F3i F4 są nazwami innych funkcji w tej samej aplikacji funkcji. Przepływ sterowania można zaimplementować przy użyciu normalnych konstrukcji kodowania imperatywnego. Kod jest wykonywany z góry w dół. Kod może obejmować istniejącą semantykę przepływu sterowania w języku, takie jak instrukcje warunkowe i pętle.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Możesz użyć Invoke-DurableActivity polecenia , aby wywołać inne funkcje według nazwy, przekazać parametry i zwrócić dane wyjściowe funkcji. Za każdym razem, gdy kod wywołuje Invoke-DurableActivity bez przełącznika NoWait, framework Durable Functions zapisuje stan bieżącego wystąpienia funkcji. Jeśli proces lub maszyna wirtualna zostaną zresetowane w trakcie wykonywania, wystąpienie funkcji zostanie wznowione z poprzedniego wywołania Invoke-DurableActivity. Aby uzyskać więcej informacji, zobacz następną sekcję Pattern #2: Fan-out/fan-in.

W tym przykładzie wartości F1, F2, F3i F4 są nazwami innych funkcji w tej samej aplikacji funkcji. Przepływ sterowania można zaimplementować przy użyciu normalnych konstrukcji kodowania imperatywnego. Kod jest wykonywany z góry w dół. Kod może obejmować istniejącą semantykę przepływu sterowania, taką jak instrukcje warunkowe i pętle.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Obiekt ctx umożliwia korzystanie z wywoływania innych funkcji według nazwy, przekazywania parametrów i zwracania danych wyjściowych funkcji. Dane wyjściowe tych metod to obiekt, w którym Task<V> jest typem V danych zwracanych przez wywołaną funkcję. Za każdym razem, gdy wywołujesz Task<V>.await(), platforma Durable Functions zapisuje postęp bieżącej instancji funkcji. Jeśli proces nieoczekiwanie odtwarza się w połowie wykonywania, wystąpienie funkcji zostanie wznowione z poprzedniego wywołania Task<V>.await() . Aby uzyskać więcej informacji, zobacz następną sekcję zatytułowaną Pattern #2: Fan-out/fan-in.

Wzorzec nr 2: Fan-out/fan-in

We wzorcu fan-out/fan-in wykonujesz wiele funkcji równolegle, a następnie czekasz na zakończenie wszystkich funkcji. Często pewne prace związane z agregacją są wykonywane na wynikach zwracanych przez funkcje.

Diagram wzorca rozdzielania i scalania zadań.

Dzięki normalnym funkcjom można wymyślić możliwość wysyłania wielu komunikatów do kolejki przez funkcję . Wrócenie do poprzedniego stanu jest znacznie trudniejsze. Aby złączyć dane, w normalnej funkcji piszesz kod do śledzenia, kiedy kończą się funkcje wyzwalane przez kolejkę, a następnie przechowujesz wyniki funkcji.

Rozszerzenie Durable Functions obsługuje ten wzorzec za pomocą stosunkowo prostego kodu:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Praca rozdzielania jest dystrybuowana do wielu instancji funkcji F2. Praca jest śledzona przy użyciu dynamicznej listy zadań. Task.WhenAll jest wywoływana, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2 Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3 funkcji.

Automatyczne tworzenie punktów kontrolnych, które odbywa się przy await wywołaniu Task.WhenAll , gwarantuje, że potencjalna awaria lub ponowny rozruch w połowie drogi nie wymaga ponownego uruchomienia już ukończonego zadania.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2 funkcji. Praca jest śledzona przy użyciu dynamicznej listy zadań. context.df.Task.all Interfejs API jest wywoływany, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2 Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3 funkcji.

Automatyczne tworzenie punktów kontrolnych, które odbywa się przy yield wywołaniu context.df.Task.all , gwarantuje, że potencjalna awaria lub ponowny rozruch w połowie drogi nie wymaga ponownego uruchomienia już ukończonego zadania.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Praca rozproszona jest przekazywana do wielu wystąpień funkcji F2. Praca jest śledzona przy użyciu dynamicznej listy zadań. context.task_all Interfejs API jest wywoływany, aby poczekać, aż wszystkie wywołane funkcje zostaną zakończone. F2 Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3 funkcji.

Automatyczne tworzenie punktów kontrolnych, które odbywa się przy yield wywołaniu context.task_all , gwarantuje, że potencjalna awaria lub ponowny rozruch w połowie drogi nie wymaga ponownego uruchomienia już ukończonego zadania.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2 funkcji. Zwróć uwagę na użycie przełącznika NoWait w wywołaniu funkcji F2: ten przełącznik umożliwia orkiestratorowi kontynuowanie wywoływania F2 bez oczekiwania na ukończenie aktywności. Praca jest śledzona przy użyciu dynamicznej listy zadań. Polecenie Wait-ActivityFunction jest wywoływane, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2 Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i przekazywane do F3 funkcji.

Automatyczne tworzenie punktów kontrolnych, które odbywa się podczas wywołania Wait-ActivityFunction, gwarantuje, że potencjalna awaria lub ponowny rozruch w trakcie nie wymaga ponownego uruchomienia już ukończonego zadania.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Praca nad wentylatorem jest dystrybuowana do wielu wystąpień F2 funkcji. Praca jest śledzona przy użyciu dynamicznej listy zadań. ctx.allOf(parallelTasks).await() jest wywoływana, aby poczekać na zakończenie wszystkich wywoływanych funkcji. F2 Następnie dane wyjściowe funkcji są agregowane z listy zadań dynamicznych i zwracane jako dane wyjściowe funkcji orkiestratora.

Automatyczne tworzenie punktów kontrolnych, które odbywa się podczas wywołania .await(), ctx.allOf(parallelTasks) gwarantuje, że nieoczekiwane ponowne uruchomienie procesu nie wymaga ponownego uruchomienia żadnych już ukończonych zadań.

Uwaga

W rzadkich okolicznościach możliwe jest, że awaria może wystąpić w przedziale czasowym po zakończeniu funkcji działania, ale zanim jej zakończenie zostanie zapisane w historii orkiestracji. W takim przypadku funkcja aktywności zostanie uruchomiona ponownie od początku po odzyskaniu procesu.

Wzorzec nr 3: asynchroniczne interfejsy API HTTP

Wzorzec asynchronicznego interfejsu API HTTP rozwiązuje problem koordynowania stanu długotrwałych operacji z klientami zewnętrznymi. Typowym sposobem zaimplementowania tego wzorca jest posiadanie punktu końcowego HTTP wyzwalającego długotrwałą akcję. Następnie przekieruj klienta do punktu końcowego stanu, który klient sonduje, aby dowiedzieć się, kiedy operacja zostanie zakończona.

Diagram przedstawiający wzorzec interfejsu API HTTP.

Rozszerzenie Durable Functions oferuje wbudowaną obsługę tego wzorca, upraszczając lub nawet eliminując potrzebę pisania kodu do interakcji z długotrwale działającymi funkcjami. Na przykład przykłady szybkiego uruchomienia rozszerzenia Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell i Java) pokazują proste polecenie REST, którego można użyć do uruchamiania nowych wystąpień funkcji orkiestratora. Po uruchomieniu wystąpienia rozszerzenie uwidacznia interfejsy API HTTP elementu webhook, które wysyłają zapytanie do stanu funkcji orkiestratora.

W poniższym przykładzie przedstawiono polecenia REST, które uruchamiają koordynatora i wysyłają zapytania dotyczące jego stanu. W celu zapewnienia przejrzystości niektóre szczegóły protokołu zostaną pominięte w przykładzie.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Ponieważ środowisko uruchomieniowe Durable Functions zarządza stanem, nie trzeba implementować własnego mechanizmu śledzenia stanu.

Rozszerzenie Durable Functions udostępnia wbudowane interfejsy API HTTP, które zarządzają długotrwałymi orkiestracjami. Możesz też samodzielnie zaimplementować ten wzorzec przy użyciu własnych wyzwalaczy funkcji (takich jak HTTP, kolejka lub Usługa Azure Event Hubs) oraz trwałe powiązanie klienta. Na przykład, możesz użyć wiadomości w kolejce do zainicjowania zakończenia. Możesz też użyć wyzwalacza HTTP chronionego przez zasady uwierzytelniania Microsoft Entra, zamiast wbudowanych interfejsów API HTTP, które używają wygenerowanego klucza do uwierzytelniania.

Aby uzyskać więcej informacji, zobacz artykuł funkcje protokołu HTTP, w którym wyjaśniono, jak można eksponować asynchroniczne, długotrwałe procesy za pośrednictwem protokołu HTTP przy użyciu rozszerzenia Durable Functions.

Wzorzec nr 4: Monitor

Wzorzec monitora odnosi się do elastycznego, cyklicznego procesu w przepływie pracy. Przykładem jest sondowanie do momentu spełnienia określonych warunków. Użycie regularnego wyzwalacza czasowego umożliwia rozwiązanie podstawowego scenariusza, takiego jak okresowe zadanie oczyszczania, ale jego interwał jest statyczny i zarządzanie czasem życia instancji staje się złożone. Za pomocą Durable Functions można tworzyć elastyczne interwały powtarzania, zarządzać czasem życia zadań i tworzyć wiele procesów monitorowania na podstawie jednej orkiestracji.

Przykładem wzorca monitorowego jest zamiana poprzedniego scenariusza z asynchronicznym interfejsem API HTTP. Zamiast udostępniania punktu końcowego dla klienta zewnętrznego w celu monitorowania długotrwałej operacji, monitor długoterminowy korzysta z zewnętrznego punktu końcowego, a następnie czeka na zmianę stanu.

Diagram przedstawiający wzorzec monitora.

W kilku wierszach kodu można użyć rozszerzenia Durable Functions, aby utworzyć wiele monitorów, które obserwują dowolne punkty końcowe. Monitory mogą zakończyć wykonywanie po spełnieniu warunku lub inna funkcja może użyć klienta trwałej aranżacji w celu zakończenia monitorów. Interwał monitora wait można zmienić na podstawie określonego warunku (na przykład wycofywania wykładniczego).

Poniższy kod implementuje podstawowy monitor:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Gdy zostanie odebrane żądanie, tworzona jest nowa instancja orkiestracji dla tego identyfikatora zadania. Wystąpienie sprawdza stan, aż zostanie spełniony warunek lub wygaśnie limit czasu. Trwały czasomierz kontroluje interwał sondowania. Następnie można wykonać więcej pracy lub orkiestracja może się zakończyć.

Wzorzec nr 5: Interakcja z człowiekiem

Wiele zautomatyzowanych procesów obejmuje jakąś interakcję człowieka. Zaangażowanie ludzi w zautomatyzowany proces jest trudne, ponieważ ludzie nie są tak wysoko dostępni i reagujący jak usługi w chmurze. Zautomatyzowany proces może umożliwić tę interakcję przy użyciu limitów czasu i logiki kompensacji.

Proces zatwierdzania to przykład procesu biznesowego, który obejmuje interakcję człowieka. Zatwierdzenie przez menedżera może być wymagane dla raportu wydatków, który przekracza określoną kwotę w dolarach. Jeśli menedżer nie zatwierdzi raportu wydatków w ciągu 72 godzin (menedżer może być na wakacjach), proces eskalacji zostaje uruchomiony, aby uzyskać zgodę od kogoś innego (być może przełożonego menedżera).

Diagram wzorca interakcji człowieka.

Wzorzec w tym przykładzie można zaimplementować przy użyciu funkcji orkiestratora. Orkiestrator używa trwałego czasomierza do żądania zatwierdzenia. Orkiestrator eskaluje, jeśli wystąpi przekroczenie limitu czasu. Orkiestrator czeka na zdarzenie zewnętrzne, takie jak powiadomienie wygenerowane przez interakcję człowieka.

Te przykłady umożliwiają utworzenie procesu zatwierdzania w celu zademonstrowania wzorca interakcji człowieka:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Aby utworzyć trwały czasomierz, wywołaj metodę context.CreateTimer. Powiadomienie jest odbierane przez element context.WaitForExternalEvent. Następnie wywoływana jest Task.WhenAny aby zdecydować, czy najpierw nastąpi przekroczenie limitu czasu (eskalacja) czy też zostanie przetworzone zatwierdzenie (zatwierdzenie odbierane jest przed przekroczeniem limitu czasu).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Aby utworzyć trwały czasomierz, wywołaj metodę context.df.createTimer. Powiadomienie jest odbierane przez context.df.waitForExternalEvent. Następnie context.df.Task.any jest wywoływane, aby zdecydować, czy eskalować (najpierw nastąpi limit czasu) lub dokonać zatwierdzenia (zatwierdzenie jest odbierane przed limitem czasu).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Aby utworzyć trwały czasomierz, wywołaj metodę context.create_timer. Powiadomienie jest odbierane przez context.wait_for_external_event. Następnie wywoływana jest funkcja context.task_any, aby zdecydować, czy eskalować problem (jeśli najpierw nastąpi przekroczenie limitu czasu), lub przetworzyć zatwierdzenie (jeśli zostanie ono odebrane przed upływem limitu czasu).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Aby utworzyć trwały czasomierz, wywołaj metodę Start-DurableTimer. Powiadomienie jest odbierane przez Start-DurableExternalEventListener. Następnie Wait-DurableTask jest wywoływany, aby zdecydować, czy eskalować (jeśli najpierw wystąpi przekroczenie limitu czasu) lub przetworzyć akceptację (jeśli akceptacja zostanie otrzymana przed przekroczeniem limitu czasu).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Wywołanie ctx.waitForExternalEvent(...).await() metody wstrzymuje orkiestrację do momentu odebrania zdarzenia o nazwie ApprovalEvent, które ma boolean ładunek. Jeśli zdarzenie zostanie odebrane, zostanie wywołana funkcja działania, aby przetworzyć wynik zatwierdzenia. Jeśli jednak takie zdarzenie nie zostanie odebrane przed wygaśnięciem timeout (72 godziny), zostanie podjęte działanie TaskCanceledException, a funkcja aktywności Escalate zostanie wywołana.

Uwaga

Nie są naliczane opłaty za czas oczekiwania na zdarzenia zewnętrzne podczas korzystania z planu konsumpcji.

Klient zewnętrzny może dostarczyć powiadomienie o zdarzeniu do oczekującej funkcji orkiestratora przy użyciu wbudowanych interfejsów API protokołu HTTP.

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Zdarzenie można również wywołać za pomocą klienta trwałej orkiestracji z innej funkcji w obrębie tej samej aplikacji funkcyjnej.

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Wzorzec nr 6: Agregator (jednostki stanowe)

Szósty wzorzec dotyczy agregowania danych zdarzeń w danym okresie do pojedynczej, adresowalnej jednostki. W tym wzorcu agregowane dane mogą pochodzić z wielu źródeł, mogą być dostarczane w partiach lub mogą być rozproszone w długich okresach czasu. Agregator może wymagać podjęcia działań dotyczących danych zdarzenia w miarę ich nadejścia, a klienci zewnętrzni mogą wymagać wykonania zapytań dotyczących zagregowanych danych.

Diagram przedstawiający agregator.

Trudność w próbie zaimplementowania tego wzorca za pomocą zwykłych funkcji bezstanowych polega na tym, że kontrola współbieżności staje się ogromnym wyzwaniem. Nie tylko musisz martwić się o wiele wątków modyfikujących te same dane w tym samym czasie, należy również martwić się o zapewnienie, że agregator działa tylko na jednej maszynie wirtualnej jednocześnie.

Za pomocą jednostek Durable można łatwo zaimplementować ten wzorzec jako pojedynczą funkcję.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Jednostki trwałe można również modelować jako klasy na platformie .NET. Ten model może być przydatny, jeśli lista operacji jest stała i staje się duża. Poniższy przykład to równoważna implementacja Counter jednostki przy użyciu klas i metod platformy .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Uwaga

Jednostki trwałe nie są obecnie obsługiwane w programie PowerShell.

Uwaga

Jednostki trwałe nie są obecnie obsługiwane w języku Java.

Klienci mogą kolejkować operacje (znane również jako sygnalizowanie) funkcji jednostki przy użyciu powiązania klienta jednostki.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Uwaga

Dynamicznie generowane serwery proxy są również dostępne na platformie .NET na potrzeby sygnalizowania jednostek w bezpieczny sposób. Oprócz sygnalizowania klienci mogą również wykonywać zapytania dotyczące stanu funkcji jednostki przy użyciu metod bezpiecznych typu w powiązaniu klienta orkiestracji.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Funkcje jednostek są dostępne w Durable Functions 2.0 i nowszych dla języków C#, JavaScript i Python.

Technologia

W tle rozszerzenie Durable Functions jest oparte na strukturze Durable Task Framework— bibliotece typu open source w usłudze GitHub używanej do tworzenia przepływów pracy w kodzie. Podobnie jak usługa Azure Functions to bezserwerowa ewolucja zadań WebJob platformy Azure, rozszerzenie Durable Functions to bezserwerowa ewolucja platformy Durable Task Framework. Firma Microsoft i inne organizacje intensywnie używają platformy Durable Task Framework do automatyzowania procesów o znaczeniu krytycznym. Jest to naturalne rozwiązanie dla bezserwerowego środowiska usługi Azure Functions.

Ograniczenia kodu

Aby zapewnić niezawodne i długotrwałe gwarancje wykonywania, funkcje orkiestratora mają zestaw reguł kodowania, które muszą być przestrzegane. Aby uzyskać więcej informacji, zobacz Ograniczenia kodu funkcji programu Orchestrator.

Rozliczenia

Usługa Durable Functions jest rozliczana tak samo jak usługa Azure Functions. Aby uzyskać więcej informacji, zobacz Cennik usługi Azure Functions. Podczas wykonywania funkcji orkiestratora w planie konsumpcyjnym usługi Azure Functions należy pamiętać o pewnych kwestiach związanych z rozliczaniem. Aby uzyskać więcej informacji na temat tych zachowań, zobacz artykuł Rozliczenia Durable Functions.

Rozpocznij od razu

Rozpocznij korzystanie z Durable Functions w niecałe 10 minut, kończąc jeden z tych językowo specyficznych samouczków typu szybki start:

W tych szybkich przewodnikach startowych utworzysz i przetestujesz lokalnie funkcję trwałą Hello world. Kod funkcji zostanie następnie opublikowany na platformie Azure. Utworzona przez Ciebie funkcja orkiestruje i łączy wywołania do innych funkcji.

Publikacje

Rozszerzenie Durable Functions jest opracowywane we współpracy z firmą Microsoft Research. W związku z tym zespół Durable Functions aktywnie tworzy dokumenty badawcze i artefakty; należą do nich:

Pokaz wideo

Poniższy klip wideo prezentuje zalety rozszerzenia Durable Functions:

Inne opcje aranżacji

Durable Functions to zaawansowane rozszerzenie usługi Azure Functions i może nie być odpowiednie dla wszystkich aplikacji. Aby zapoznać się z porównaniem technologii orkiestracji dostępnych na platformie Azure, zobacz Porównanie usług Azure Functions i Azure Logic Apps.