O que são as Durable Functions?

As Durable Functions são uma extensão do Azure Functions que permite escrever funções com estado em um ambiente de computação sem servidor. A extensão permite definir fluxos de trabalho com estado pela escrita de funções de orquestrador e entidades com estado pela escrita de funções de entidade usando o modelo de programação do Azure Functions. Nos bastidores, a extensão gerencia o estado, os pontos de verificação e as reinicializações para você, permitindo que você se concentre na lógica de negócios.

Idiomas compatíveis

O Durable Functions foi projetado para trabalhar com todas as linguagens de programação do Azure Functions, mas pode ter requisitos mínimos diferentes para cada uma delas. A tabela a seguir mostra as configurações mínimas de aplicativo com suporte:

Pilha de linguagem Versões do Azure Functions Runtime Versão do trabalho de linguagem Versão de pacotes mínimos
.NET/C#/F# Functions 1.0 ou mais recente Em processo
Fora do processo
n/d
JavaScript/TypeScript (modelo V3 prog.) Functions 2.0 ou mais recente Node 8 ou mais recente Pacotes 2.x
JavaScript/TypeScript (modelo prog. V4) Functions 4.25+ Node 18+ Pacotes 3.15 ou mais recente
Python Functions 2.0 ou mais recente Python 3.7 ou mais recente Pacotes 2.x
Python (modelo de programação V2 ) Functions 4.0 e posterior Python 3.7 ou mais recente Pacotes 3.15 ou mais recente
PowerShell Functions 3.0 ou mais recente PowerShell 7 ou mais recente Pacotes 2.x
Java Functions 4.0 e posterior Java 8 ou mais recente Pacotes 4.x

Importante

Este artigo usa guias para dar suporte a várias versões do modelo de programação Node.js. O modelo v4 normalmente está disponível e foi projetado para oferecer uma experiência mais flexível e intuitiva para desenvolvedores de JavaScript e TypeScript. Para obter mais detalhes sobre como funciona o modelo v4, consulte o Guia do desenvolvedor do Node.js para o Azure Functions. Para saber mais sobre as diferenças entre os modelos v3 e a v4, consulte o Guia de migração.

Importante

Este artigo usa guias para dar suporte a várias versões do modelo de programação do Python. O modelo v2 está em disponibilidade geral e foi projetado para fornecer uma maneira mais centrada em código de criar funções por meio de decoradores. Para obter mais detalhes sobre como funciona o modelo v2, consulte o Guia de desenvolvedor do Node.js para Azure Functions.

Assim como o Azure Functions, há modelos para ajudá-lo a desenvolver as Durable Functions usando o Visual Studio, o Visual Studio Code e o portal do Azure.

Padrões de aplicativo

O principal caso de uso das Durable Functions é simplificar requisitos complexos de coordenação com estado em aplicativos sem servidor. As seguintes seções descrevem padrões de aplicativo típicos que podem se beneficiar com as Durable Functions:

Padrão 1: encadeamento de funções

No padrão de encadeamento de funções, uma sequência de funções é executada em uma ordem específica. Nesse padrão, a saída de uma função é aplicada à entrada de outra função. O uso de filas entre cada função garante que o sistema permaneça durável e escalonável, mesmo que haja um fluxo de controle de uma função para outra.

A diagram of the function chaining pattern

Use as Durable Functions para implementar o padrão de encadeamento de funções de forma concisa, conforme mostrado no exemplo a seguir.

Nesse exemplo, os valores F1, F2, F3 e F4 são os nomes de outras funções no mesmo aplicativo de funções. Implemente o fluxo de controle usando construtos de codificação imperativa normal. O código é executado de cima para baixo. Ele pode envolver a semântica do fluxo de controle da linguagem existente, como condicionais e loops. Você pode incluir a lógica de tratamento de erro em blocos try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

É possível usar o parâmetro context para invocar outras funções por nome, passar parâmetros e retornar a saída da função. Cada vez que o código chama await, a estrutura das Durable Functions cria um ponto de verificação para o progresso da instância da função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função retomará na chamada await anterior. Para obter mais informações, confira a próxima seção, Padrão nº2: Fan-out/fan-in.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

É possível usar o objeto context.df para invocar outras funções por nome, passar parâmetros e retornar a saída da função. Cada vez que o código chama yield, a estrutura das Durable Functions cria um ponto de verificação para o progresso da instância da função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função retomará na chamada yield anterior. Para obter mais informações, confira a próxima seção, Padrão nº2: Fan-out/fan-in.

Observação

O objeto context em JavaScript representa o contexto inteiro da função. Acesse o contexto das Durable Functions usando a propriedade df no contexto principal.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

É possível usar o objeto context para invocar outras funções por nome, passar parâmetros e retornar a saída da função. Cada vez que o código chama yield, a estrutura das Durable Functions cria um ponto de verificação para o progresso da instância da função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função retomará na chamada yield anterior. Para obter mais informações, confira a próxima seção, Padrão nº2: Fan-out/fan-in.

Observação

O objeto context no Python representa o contexto de orquestração. Acesse o contexto principal do Azure Functions usando a propriedade function_context no contexto de orquestração.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Use o comando Invoke-DurableActivity para invocar outras funções por nome, transmitir parâmetros e retornar a saída da função. Toda vez que o código chama Invoke-DurableActivity sem a opção NoWait, a estrutura das Durable Functions cria um ponto de verificação do progresso da instância da função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função retomará na chamada Invoke-DurableActivity anterior. Para obter mais informações, confira a próxima seção, Padrão nº2: Fan-out/fan-in.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

É possível usar o objeto ctx para invocar outras funções por nome, passar parâmetros e retornar a saída da função. A saída dessas chamadas de método é um objeto Task<V> em que o tipo de dados retornados pela função invocada é V. Cada vez que você chama Task<V>.await(), a estrutura das Durable Functions cria um ponto de verificação para o progresso da instância da função atual. Se o processo reciclar inesperadamente no meio da execução, a instância da função retomará da chamada Task<V>.await()anterior. Para obter mais informações, confira a próxima seção, Padrão nº2: Fan-out/fan-in.

Padrão 2: fan-out/fan-in

No padrão fan-out/fan-in, execute várias funções em paralelo e, em seguida, aguarde a conclusão de todas as funções. Frequentemente, algum trabalho de agregação é feito nos resultados retornados pelas funções.

A diagram of the fan out/fan pattern

Com funções normais, realize fan-out fazendo com que a função envie várias mensagens para uma fila. No entanto, o processo de realizar fan-in é muito mais complexo. Para o fan-in, em uma função normal, você escreve o código a ser controlado quando as funções disparadas por fila terminam e, em seguida, armazena saídas da função.

A extensão Durable Functions cuida desse padrão com um código relativamente simples:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

O trabalho de fan-out é distribuído para várias instâncias da função F2. O trabalho é acompanhado usando uma lista dinâmica de tarefas. Task.WhenAll é chamada para aguardar até que todas as funções chamadas sejam concluídas. Em seguida, as saídas da função F2 são agregadas da lista de tarefas dinâmicas e passadas para a função F3.

A criação automática de ponto de verificação que ocorre na chamada await em Task.WhenAll garante que uma possível falha ou reinicialização no meio do processo não exija a reinicialização de uma tarefa já concluída.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

O trabalho de fan-out é distribuído para várias instâncias da função F2. O trabalho é acompanhado usando uma lista dinâmica de tarefas. A API context.df.Task.all é chamada para aguardar até que todas as funções chamadas sejam concluídas. Em seguida, as saídas da função F2 são agregadas da lista de tarefas dinâmicas e passadas para a função F3.

A criação automática de ponto de verificação que ocorre na chamada yield em context.df.Task.all garante que uma possível falha ou reinicialização no meio do processo não exija a reinicialização de uma tarefa já concluída.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

O trabalho de fan-out é distribuído para várias instâncias da função F2. O trabalho é acompanhado usando uma lista dinâmica de tarefas. A API context.task_all é chamada para aguardar até que todas as funções chamadas sejam concluídas. Em seguida, as saídas da função F2 são agregadas da lista de tarefas dinâmicas e passadas para a função F3.

A criação automática de ponto de verificação que ocorre na chamada yield em context.task_all garante que uma possível falha ou reinicialização no meio do processo não exija a reinicialização de uma tarefa já concluída.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

O trabalho de fan-out é distribuído para várias instâncias da função F2. Observe o uso da opção NoWait na invocação da função F2: essa opção permite que o orquestrador continue invocando F2 sem precisar aguardar a conclusão da atividade. O trabalho é acompanhado usando uma lista dinâmica de tarefas. O comando Wait-ActivityFunction é chamado para aguardar até todas as funções chamadas serem concluídas. Em seguida, as saídas da função F2 são agregadas da lista de tarefas dinâmicas e passadas para a função F3.

A criação automática de ponto de verificação que ocorre na chamada Wait-ActivityFunction garante que uma possível falha ou reinicialização no meio do processo não exija a reinicialização de uma tarefa já concluída.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

O trabalho de fan-out é distribuído para várias instâncias da função F2. O trabalho é acompanhado usando uma lista dinâmica de tarefas. ctx.allOf(parallelTasks).await() é chamada para aguardar até que todas as funções chamadas sejam concluídas. Em seguida, as saídas da função F2 são agregadas da lista de tarefas dinâmicas e retornadas como saída da função do orquestrador.

A criação automática de ponto de verificação que ocorre na chamada .await() em ctx.allOf(parallelTasks) garante que uma reciclagem de processo inesperada não exija a reinicialização de uma tarefa já concluída.

Observação

Em raras circunstâncias, é possível que uma falha ocorra na janela depois que uma função de atividade for concluída, mas antes de sua conclusão ser salva no histórico de orquestração. Se isso acontecer, a função de atividade será executada novamente desde o início depois que o processo for recuperado.

Padrão 3: APIs de HTTP assíncrono

O padrão de API HTTP assíncrona trata do problema de coordenar o estado de operações de execução longa com clientes externos. Uma maneira comum de implementar esse padrão é fazer com que um ponto de extremidade HTTP dispare a ação de execução longa. Em seguida, redirecione o cliente para um ponto de extremidade de status que é sondado pelo cliente para saber quando a operação é concluída.

A diagram of the HTTP API pattern

As Durable Functions fornecem suporte interno para esse padrão, simplificando ou, até mesmo, removendo o código que você precisa escrever para interagir com execuções de função de execução longa. Por exemplo, os exemplos de início rápido de Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell e Java) mostram um comando REST simples que você pode usar para iniciar novas instâncias de função do orquestrador. Depois que uma instância é iniciada, a extensão expõe as APIs HTTP de webhook que consultam o status da função de orquestrador.

O exemplo a seguir mostra os comandos REST que iniciam um orquestrador e consultam seu status. Para maior clareza, alguns detalhes do protocolo foram omitidos do exemplo.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Como o runtime das Durable Functions gerencia o estado para você, você não precisa implementar seu próprio mecanismo de acompanhamento de status.

A extensão Durable Functions expõe as APIs HTTP internas que gerenciam orquestrações de execução longa. Como alternativa, você pode implementar esse padrão usando seus próprios gatilhos de função (como HTTP, uma fila ou Hubs de Eventos do Azure) e a associação de clientes duráveis. Por exemplo, você pode usar uma mensagem da fila para disparar o encerramento. Ou você pode usar um gatilho HTTP protegido por uma política de autenticação do Microsoft Entra, em vez de APIs HTTP internas que usam uma chave gerada para autenticação.

Para obter mais informações, confira o artigo Recursos HTTP, que explica como você pode expor processos assíncronos de execução longa via HTTP usando a extensão das Durable Functions.

Padrão 4: Monitoramento

O padrão de monitor refere-se a um processo recorrente e flexível em um fluxo de trabalho. Um exemplo é fazer uma sondagem até que condições específicas sejam atendidas. Você pode usar um gatilho de temporizador normal para lidar com um cenário básico, como um trabalho de limpeza periódico, mas seu intervalo é estático e o gerenciamento do tempo de vida da instância torna-se complexo. Use as Durable Functions para criar intervalos de recorrência flexíveis, gerenciar os tempos de vida de tarefas e criar vários processos de monitor com base em uma única orquestração.

Um exemplo do padrão de monitor é reverter o cenário de API HTTP assíncrona anterior. Em vez de expor um ponto de extremidade para um cliente externo monitorar uma operação de execução longa, o monitor de execução longa consome um ponto de extremidade externo e, em seguida, aguarda uma alteração de estado.

A diagram of the monitor pattern

Em poucas linhas de código, você pode usar as Durable Functions para criar vários monitores que observam pontos de extremidade arbitrários. Os monitores podem encerrar a execução quando uma condição é atendida ou outra função pode usar o cliente de orquestração durável para encerrar os monitores. Você pode alterar o intervalo de wait de um monitor de acordo com uma condição específica (por exemplo, retirada exponencial).

O seguinte código implementa um monitor básico:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Quando uma solicitação é recebida, uma nova instância de orquestração é criada para essa ID do trabalho. A instância sonda um status até que uma condição seja atendida ou até que um tempo limite expire. Um temporizador durável controla o intervalo de sondagem. Em seguida, mais trabalho pode ser realizado ou a orquestração pode ser encerrada.

Padrão 5: interação humana

Muitos processos automatizados envolvem algum tipo de interação humana. Envolver humanos em um processo automatizado é complicado, porque as pessoas não estão tão altamente disponíveis nem são tão dinâmicas quanto os serviços de nuvem. Um processo automatizado pode permitir essa interação usando tempos limite e a lógica de compensação.

Um processo de aprovação é um exemplo de um processo empresarial que envolve a interação humana. A aprovação de um gerente pode ser necessária para um relatório de despesas que exceda determinado valor em dólares. Se o gerente não aprovar o relatório de despesas em até 72 horas (talvez ele esteja de férias), um processo de escalonamento será iniciado para obter a aprovação de outra pessoa (talvez o gerente do gerente).

A diagram of the human interaction pattern

Você pode implementar o padrão nesse exemplo usando uma função de orquestrador. O orquestrador usa um temporizador durável para solicitar aprovação. O orquestrador fará o escalonamento se o tempo limite se esgotar. O orquestrador aguarda um evento externo, como uma notificação gerada por uma interação humana.

Estes exemplos criam um processo de aprovação para demonstrar o padrão de interação humana:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Para criar o temporizador durável, chame context.CreateTimer. A notificação é recebida por context.WaitForExternalEvent. Em seguida, Task.WhenAny é chamado para decidir se o próximo passo é escalonar (o tempo limite ocorre primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Para criar o temporizador durável, chame context.df.createTimer. A notificação é recebida por context.df.waitForExternalEvent. Em seguida, context.df.Task.any é chamado para decidir se o próximo passo é escalonar (o tempo limite ocorre primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Para criar o temporizador durável, chame context.create_timer. A notificação é recebida por context.wait_for_external_event. Em seguida, context.task_any é chamado para decidir se o próximo passo é escalonar (o tempo limite ocorre primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Para criar o temporizador durável, chame Start-DurableTimer. A notificação é recebida por Start-DurableExternalEventListener. Em seguida, Wait-DurableTask é chamado para decidir se o próximo passo é escalonar (o tempo limite ocorre primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

A ctx.waitForExternalEvent(...).await() chamada de método pausa a orquestração até receber um evento chamado ApprovalEvent, que tem um payload boolean. Se o evento for recebido, uma função de atividade será chamada para processar o resultado da aprovação. No entanto, se nenhum evento desse tipo for recebido antes da timeout expiração (72 horas), será gerado um TaskCanceledException e a Escalate função de atividade será chamada.

Observação

Não há cobrança pelo tempo gasto aguardando eventos externos durante a execução no plano consumo.

Um cliente externo pode entregar a notificação de eventos para uma função de orquestrador em espera usando as APIs HTTP internas:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Um evento também pode ser gerado usando o cliente de orquestração durável de outra função no mesmo aplicativo de funções:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Padrão nº 6: Agregador (entidades com estado)

O sexto padrão trata da agregação de dados de evento durante um período em uma única entidade endereçável. Nesse padrão, os dados que estão sendo agregados podem vir de várias fontes, podem ser entregues em lotes ou podem estar distribuídos por longos períodos. O agregador pode precisar executar uma ação nos dados de evento quando eles são recebidos e os clientes externos talvez precisem consultar os dados agregados.

Aggregator diagram

A complexidade de tentar implementar esse padrão com as funções normais sem estado é que o controle de simultaneidade se torna um grande desafio. Além de se preocupar com vários threads modificando os mesmos dados ao mesmo tempo, você precisa se preocupar em garantir que o agregador só seja executado em uma única VM por vez.

Você pode usar Entidades duráveis para implementar facilmente esse padrão como uma única função.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

As entidades duráveis também podem ser modeladas como classes no .NET. Esse modelo pode ser útil se a lista de operações é fixa e se torna grande. O exemplo a seguir é uma implementação equivalente da entidade Counter usando métodos e classes .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Observação

Atualmente, as entidades duráveis não são compatíveis com o PowerShell.

Observação

Atualmente, as entidades duráveis não dão suporte no Java.

Os clientes podem enfileirar operações de uma função de entidade (também conhecido como "sinalização") usando a associação do cliente de entidade.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Observação

Os proxies gerados dinamicamente também estão disponíveis no .NET para sinalizar entidades para torná-las fortemente tipadas. Além da sinalização, os clientes também podem consultar o estado de uma função de entidade usando métodos fortemente tipados na associação do cliente de orquestração.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Observação

Atualmente, as entidades duráveis não são compatíveis com o PowerShell.

Observação

Atualmente, as entidades duráveis não dão suporte no Java.

As funções de entidade estão disponíveis nas Durable Functions 2.0 e posteriores para C#, JavaScript e Python.

A tecnologia

Nos bastidores, a extensão Durable Functions baseia-se na Durable Task Framework, uma biblioteca open-source no GitHub usada para a criação de fluxos de trabalho em código. Assim como o Azure Functions é a evolução sem servidor do Azure WebJobs, as Durable Functions são a evolução sem servidor da Durable Task Framework. A Microsoft e outras organizações usam a Durable Task Framework extensivamente para automatizar processos críticos. Ele é uma opção natural para o ambiente sem servidor do Azure Functions.

Restrições de código

Para fornecer garantias de execução confiáveis e de execução longa, as funções de orquestrador têm um conjunto de regras de codificação que precisam ser seguidas. Para obter mais informações, confira o artigo Restrições de código na função de orquestrador.

Cobrança

As Durable Functions são cobradas da mesma forma que o Azure Functions. Para saber mais, confira Preços do Azure Functions. Ao executar funções de orquestrador no plano de Consumo do Azure Functions, há alguns comportamentos de cobrança para sua informação. Para obter mais informações sobre esses comportamentos, confira o artigo Cobrança das Durable Functions.

Comece a usar agora

Comece a usar as Durable Functions em menos de 10 minutos concluindo um destes tutoriais de início rápido específicos a uma linguagem:

Nestes guias de início rápido, você criará e testará uma função durável "Olá, Mundo" localmente. Em seguida, você publicará o código de função no Azure. A função que você criará orquestra e encadeia chamadas para outras funções.

Publicações

O Durable Functions é desenvolvido em colaboração com o Microsoft Research. Como resultado, a equipe de Durable Functions produz ativamente artigos e artefatos de pesquisa, como:

Saiba mais

O seguinte vídeo destaca os benefícios das Durable Functions:

Como as Durable Functions são uma extensão avançada do Azure Functions, elas não são apropriadas para todos os aplicativos. Para obter uma comparação com outras tecnologias de orquestração do Azure, confira Comparar o Azure Functions e os Aplicativos Lógicos do Azure.

Próximas etapas