O que são Durable Functions?
Durable Functions é um recurso do Azure Functions que permite escrever funções com monitoração de estado em um ambiente de computação sem servidor. A extensão permite definir fluxos de trabalho com monitoração de estado escrevendo funções de orquestrador e entidades com estado escrevendo funções de entidade usando o modelo de programação do Azure Functions. Nos bastidores, a extensão gerencia o estado, os pontos de verificação e as reinicializações para você, permitindo que você se concentre em sua lógica de negócios.
Idiomas suportados
O Durable Functions foi projetado para funcionar com todas as linguagens de programação do Azure Functions, mas pode ter requisitos mínimos diferentes para cada idioma. A tabela a seguir mostra as configurações mínimas de aplicativos suportadas:
Pilha de idiomas | Versões do Azure Functions Runtime | Versão para trabalhadores linguísticos | Versão mínima dos pacotes |
---|---|---|---|
.NET / C# / F# | Funções 1.0+ | Em processo Fora do processo |
n/d |
JavaScript/TypeScript (modelo V3 prog.) | Funções 2.0+ | Nó 8+ | Pacotes 2.x |
JavaScript/TypeScript (modelo V4 prog.) | Funções 4.25+ | Nó 18+ | 3.15+ pacotes |
Python | Funções 2.0+ | Python 3.7+ | Pacotes 2.x |
Python (V2 prog. model) | Funções 4.0+ | Python 3.7+ | 3.15+ pacotes |
PowerShell | Funções 3.0+ | PowerShell 7+ | Pacotes 2.x |
Java | Funções 4.0+ | Java 8+ | Pacotes 4.x |
Importante
Este artigo usa guias para oferecer suporte a várias versões do modelo de programação Node.js. O modelo v4 está geralmente disponível e foi projetado para ter uma experiência mais flexível e intuitiva para desenvolvedores JavaScript e TypeScript. Para obter mais detalhes sobre como o modelo v4 funciona, consulte o Guia do desenvolvedor do Azure Functions Node.js. Para saber mais sobre as diferenças entre v3 e v4, consulte o guia de migração.
Importante
Este artigo usa guias para suportar várias versões do modelo de programação Python. O modelo v2 está geralmente disponível e foi projetado para fornecer uma maneira mais centrada em código para a criação de funções por meio de decoradores. Para obter mais detalhes sobre como o modelo v2 funciona, consulte o guia do desenvolvedor Python do Azure Functions.
Como o Azure Functions, há modelos para ajudá-lo a desenvolver Durable Functions usando o Visual Studio, o Visual Studio Code e o portal do Azure.
Padrões de aplicação
O principal caso de uso do Durable Functions é a simplificação de requisitos de coordenação complexos e com monitoração de estado em aplicativos sem servidor. As seções a seguir descrevem padrões de aplicativos típicos que podem se beneficiar de funções duráveis:
- Encadeamento de funções
- Fan-out/fan-in
- APIs HTTP assíncronas
- Monitorização
- Interação humana
- Agregador (entidades com estado)
Padrão #1: Encadeamento de funções
No padrão de encadeamento de funções, uma sequência de funções é executada em uma ordem específica. Neste padrão, a saída de uma função é aplicada à entrada de outra função. O uso de filas entre cada função garante que o sistema permaneça durável e escalável, mesmo que haja um fluxo de controle de uma função para outra.
Você pode usar Durable Functions para implementar o padrão de encadeamento de funções de forma concisa, conforme mostrado no exemplo a seguir.
Neste exemplo, os valores F1
, F2
, F3
, e F4
são os nomes de outras funções no mesmo aplicativo de função. Você pode implementar o fluxo de controle usando construções de codificação imperativas normais. O código é executado de cima para baixo. O código pode envolver semântica de fluxo de controle de linguagem existente, como condicionais e loops. Você pode incluir lógica de tratamento de erros em try
//catch
finally
blocos.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Você pode usar o context
parâmetro para invocar outras funções por nome, parâmetros de passagem e saída de função de retorno. Cada vez que o código chama await
, a estrutura Durable Functions verifica o progresso da instância de função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função será retomada da chamada anterior await
. Para obter mais informações, consulte a próxima seção, Padrão #2: Fan out/fan in.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
Você pode usar o context.df
objeto para invocar outras funções por nome, parâmetros de passagem e saída de função de retorno. Cada vez que o código chama yield
, a estrutura Durable Functions verifica o progresso da instância de função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função será retomada da chamada anterior yield
. Para obter mais informações, consulte a próxima seção, Padrão #2: Fan out/fan in.
Nota
O context
objeto em JavaScript representa todo o contexto da função. Acesse o contexto Durable Functions usando a df
propriedade no contexto principal.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Você pode usar o context
objeto para invocar outras funções por nome, parâmetros de passagem e saída de função de retorno. Cada vez que o código chama yield
, a estrutura Durable Functions verifica o progresso da instância de função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função será retomada da chamada anterior yield
. Para obter mais informações, consulte a próxima seção, Padrão #2: Fan out/fan in.
Nota
O context
objeto em Python representa o contexto de orquestração. Acesse o contexto principal do Azure Functions usando a function_context
propriedade no contexto de orquestração.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
Você pode usar o Invoke-DurableActivity
comando para invocar outras funções por nome, parâmetros de passagem e saída de função de retorno. Cada vez que o código chama Invoke-DurableActivity
sem o NoWait
switch, a estrutura Durable Functions verifica o progresso da instância de função atual. Se o processo ou a máquina virtual for reciclada no meio da execução, a instância da função será retomada da chamada anterior Invoke-DurableActivity
. Para obter mais informações, consulte a próxima seção, Padrão #2: Fan out/fan in.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Você pode usar o ctx
objeto para invocar outras funções por nome, parâmetros de passagem e saída de função de retorno. A saída desses métodos é um Task<V>
objeto onde V
é o tipo de dados retornados pela função invocada. Sempre que você chama Task<V>.await()
o , a estrutura Durable Functions verifica o progresso da instância de função atual. Se o processo for reciclado inesperadamente no meio da execução, a instância da função será retomada da chamada anterior Task<V>.await()
. Para obter mais informações, consulte a próxima seção, Padrão #2: Fan out/fan in.
Padrão #2: Saída de ventilador/entrada de ventilador
No padrão fan out/fan in, você executa várias funções em paralelo e, em seguida, aguarda que todas as funções terminem. Muitas vezes, algum trabalho de agregação é feito nos resultados que são retornados das funções.
Com funções normais, você pode se espalhar fazendo com que a função envie várias mensagens para uma fila. Voltar a entrar é muito mais desafiador. Para ventilar, em uma função normal, você escreve código para controlar quando as funções acionadas por fila terminam e, em seguida, armazena saídas de função.
A extensão Durable Functions lida com esse padrão com código relativamente simples:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
O trabalho de distribuição é distribuído para várias instâncias da F2
função. O trabalho é controlado usando uma lista dinâmica de tarefas. Task.WhenAll
é chamado para aguardar que todas as funções chamadas terminem. Em seguida, as F2
saídas da função são agregadas da lista de tarefas dinâmicas e passadas para a F3
função.
O ponto de verificação automático que acontece na await
chamada garante Task.WhenAll
que uma possível falha ou reinicialização no meio do caminho não exija a reinicialização de uma tarefa já concluída.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
O trabalho de distribuição é distribuído para várias instâncias da F2
função. O trabalho é controlado usando uma lista dinâmica de tarefas. context.df.Task.all
A API é chamada para aguardar a conclusão de todas as funções chamadas. Em seguida, as F2
saídas da função são agregadas da lista de tarefas dinâmicas e passadas para a F3
função.
O ponto de verificação automático que acontece na yield
chamada garante context.df.Task.all
que uma possível falha ou reinicialização no meio do caminho não exija a reinicialização de uma tarefa já concluída.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
O trabalho de distribuição é distribuído para várias instâncias da F2
função. O trabalho é controlado usando uma lista dinâmica de tarefas. context.task_all
A API é chamada para aguardar a conclusão de todas as funções chamadas. Em seguida, as F2
saídas da função são agregadas da lista de tarefas dinâmicas e passadas para a F3
função.
O ponto de verificação automático que acontece na yield
chamada garante context.task_all
que uma possível falha ou reinicialização no meio do caminho não exija a reinicialização de uma tarefa já concluída.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
O trabalho de distribuição é distribuído para várias instâncias da F2
função. Observe o uso do interruptor na chamada de F2
função: este interruptor permite que o orquestrador prossiga invocando F2
sem esperar pela conclusão da NoWait
atividade. O trabalho é controlado usando uma lista dinâmica de tarefas. O Wait-ActivityFunction
comando é chamado para aguardar que todas as funções chamadas terminem. Em seguida, as F2
saídas da função são agregadas da lista de tarefas dinâmicas e passadas para a F3
função.
O ponto de verificação automático que acontece na Wait-ActivityFunction
chamada garante que uma possível falha ou reinicialização no meio do caminho não exija a reinicialização de uma tarefa já concluída.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
O trabalho de distribuição é distribuído para várias instâncias da F2
função. O trabalho é controlado usando uma lista dinâmica de tarefas. ctx.allOf(parallelTasks).await()
é chamado para aguardar que todas as funções chamadas terminem. Em seguida, as F2
saídas de função são agregadas da lista de tarefas dinâmicas e retornadas como saída da função orquestradora.
O ponto de verificação automático que acontece na .await()
chamada garante ctx.allOf(parallelTasks)
que uma reciclagem de processo inesperada não exija a reinicialização de nenhuma tarefa já concluída.
Nota
Em circunstâncias raras, é possível que uma falha aconteça na janela após a conclusão de uma função de atividade, mas antes que sua conclusão seja salva no histórico de orquestração. Se isso acontecer, a função de atividade será executada novamente desde o início após a recuperação do processo.
Padrão #3: APIs HTTP assíncronas
O padrão de API HTTP assíncrona resolve o problema de coordenar o estado de operações de longa execução com clientes externos. Uma maneira comum de implementar esse padrão é fazer com que um ponto de extremidade HTTP acione a ação de longa execução. Em seguida, redirecione o cliente para um ponto de extremidade de status que o cliente sonda para saber quando a operação for concluída.
Durable Functions fornece suporte interno para esse padrão, simplificando ou até mesmo removendo o código que você precisa escrever para interagir com execuções de função de longa duração. Por exemplo, os exemplos de início rápido de Funções Duráveis (C#, JavaScript, TypeScript, Python, PowerShell e Java) mostram um comando REST simples que você pode usar para iniciar novas instâncias de função do orquestrador. Depois que uma instância é iniciada, a extensão expõe APIs HTTP de webhook que consultam o status da função do orquestrador.
O exemplo a seguir mostra comandos REST que iniciam um orquestrador e consultam seu status. Para maior clareza, alguns detalhes do protocolo são omitidos do exemplo.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Como o tempo de execução do Durable Functions gerencia o estado para você, você não precisa implementar seu próprio mecanismo de controle de status.
A extensão Durable Functions expõe APIs HTTP internas que gerenciam orquestrações de longa duração. Como alternativa, você mesmo pode implementar esse padrão usando seus próprios gatilhos de função (como HTTP, uma fila ou Hubs de Eventos do Azure) e a vinculação de cliente durável. Por exemplo, você pode usar uma mensagem de fila para disparar o encerramento. Ou, você pode usar um gatilho HTTP protegido por uma política de autenticação do Microsoft Entra em vez das APIs HTTP internas que usam uma chave gerada para autenticação.
Para obter mais informações, consulte o artigo Recursos HTTP, que explica como você pode expor processos assíncronos e de longa execução sobre HTTP usando a extensão Durable Functions.
Padrão #4: Monitor
O padrão de monitor refere-se a um processo flexível e recorrente em um fluxo de trabalho. Um exemplo é a sondagem até que condições específicas sejam atendidas. Você pode usar um gatilho de temporizador regular para abordar um cenário básico, como um trabalho de limpeza periódica, mas seu intervalo é estático e o gerenciamento da vida útil da instância torna-se complexo. Você pode usar funções duráveis para criar intervalos de recorrência flexíveis, gerenciar tempos de vida de tarefas e criar vários processos de monitor a partir de uma única orquestração.
Um exemplo do padrão de monitor é reverter o cenário anterior de API HTTP assíncrona. Em vez de expor um ponto de extremidade para um cliente externo monitorar uma operação de longa execução, o monitor de longa execução consome um ponto de extremidade externo e, em seguida, aguarda uma alteração de estado.
Em algumas linhas de código, você pode usar funções duráveis para criar vários monitores que observam pontos de extremidade arbitrários. Os monitores podem encerrar a execução quando uma condição é atendida, ou outra função pode usar o cliente de orquestração durável para encerrar os monitores. Você pode alterar o intervalo de wait
um monitor com base em uma condição específica (por exemplo, recuo exponencial).
O código a seguir implementa um monitor básico:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Quando uma solicitação é recebida, uma nova instância de orquestração é criada para esse ID de trabalho. A instância sonda um status até que uma condição seja atendida ou até que um tempo limite expire. Um temporizador durável controla o intervalo de sondagem. Então, mais trabalho pode ser executado, ou a orquestração pode terminar.
Padrão #5: Interação humana
Muitos processos automatizados envolvem algum tipo de interação humana. Envolver humanos em um processo automatizado é complicado porque as pessoas não são tão altamente disponíveis e responsivas quanto os serviços em nuvem. Um processo automatizado pode permitir essa interação usando tempos limite e lógica de compensação.
Um processo de aprovação é um exemplo de um processo de negócios que envolve interação humana. A aprovação de um gerente pode ser necessária para um relatório de despesas que exceda um determinado valor em dólares. Se o gerente não aprovar o relatório de despesas dentro de 72 horas (pode ser que o gerente tenha saído de férias), um processo de escalonamento entra em ação para obter a aprovação de outra pessoa (talvez o gerente do gerente).
Você pode implementar o padrão neste exemplo usando uma função orchestrator. O orquestrador usa um temporizador durável para solicitar aprovação. O orquestrador aumenta se o tempo limite ocorrer. O orquestrador espera por um evento externo, como uma notificação gerada por uma interação humana.
Estes exemplos criam um processo de aprovação para demonstrar o padrão de interação humana:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Para criar o temporizador durável, chame context.CreateTimer
. A notificação é recebida pela context.WaitForExternalEvent
. Em seguida, Task.WhenAny
é chamado para decidir se deve escalar (o tempo limite acontece primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Para criar o temporizador durável, chame context.df.createTimer
. A notificação é recebida pela context.df.waitForExternalEvent
. Em seguida, context.df.Task.any
é chamado para decidir se deve escalar (o tempo limite acontece primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Para criar o temporizador durável, chame context.create_timer
. A notificação é recebida pela context.wait_for_external_event
. Em seguida, context.task_any
é chamado para decidir se deve escalar (o tempo limite acontece primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Para criar o temporizador durável, chame Start-DurableTimer
. A notificação é recebida pela Start-DurableExternalEventListener
. Em seguida, Wait-DurableTask
é chamado para decidir se deve escalar (o tempo limite acontece primeiro) ou processar a aprovação (a aprovação é recebida antes do tempo limite).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
A ctx.waitForExternalEvent(...).await()
chamada de método pausa a orquestração até receber um evento chamado ApprovalEvent
, que tem uma boolean
carga útil. Se o evento for recebido, uma função de atividade será chamada para processar o resultado da aprovação. No entanto, se nenhum evento desse tipo for recebido antes que o timeout
(72 horas) expire, a TaskCanceledException
será gerado e a Escalate
função de atividade será chamada.
Nota
Não há cobrança pelo tempo gasto aguardando eventos externos ao executar no plano de Consumo.
Um cliente externo pode entregar a notificação de evento para uma função de orquestrador em espera usando as APIs HTTP internas:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
Um evento também pode ser gerado usando o cliente de orquestração durável de outra função no mesmo aplicativo de função:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Padrão #6: Agregador (entidades com estado)
O sexto padrão é sobre a agregação de dados de eventos durante um período de tempo em uma única entidade endereçável. Nesse padrão, os dados que estão sendo agregados podem vir de várias fontes, podem ser entregues em lotes ou podem estar espalhados por longos períodos de tempo. O agregador pode precisar tomar medidas sobre os dados de eventos à medida que eles chegam, e os clientes externos podem precisar consultar os dados agregados.
O complicado de tentar implementar esse padrão com funções normais e sem estado é que o controle de simultaneidade se torna um enorme desafio. Você não só precisa se preocupar com vários threads modificando os mesmos dados ao mesmo tempo, mas também precisa se preocupar em garantir que o agregador seja executado apenas em uma única VM de cada vez.
Você pode usar entidades duráveis para implementar facilmente esse padrão como uma única função.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Entidades duráveis também podem ser modeladas como classes no .NET. Este modelo pode ser útil se a lista de operações for fixa e se tornar grande. O exemplo a seguir é uma implementação equivalente da Counter
entidade usando classes e métodos .NET.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Nota
Atualmente, não há suporte para entidades duráveis no PowerShell.
Nota
Atualmente, entidades duráveis não são suportadas em Java.
Os clientes podem enfileirar operações para (também conhecida como "sinalização") uma função de entidade usando a associação de cliente de entidade.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Nota
Proxies gerados dinamicamente também estão disponíveis no .NET para sinalizar entidades de forma segura para tipos. E, além da sinalização, os clientes também podem consultar o estado de uma função de entidade usando métodos seguros de tipo na ligação do cliente de orquestração.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
As funções de entidade estão disponíveis em Durable Functions 2.0 e superior para C#, JavaScript e Python.
A tecnologia
Nos bastidores, a extensão Durable Functions é construída sobre o Durable Task Framework, uma biblioteca de código aberto no GitHub que é usada para criar fluxos de trabalho em código. Como o Azure Functions é a evolução sem servidor do Azure WebJobs, o Durable Functions é a evolução sem servidor do Durable Task Framework. A Microsoft e outras organizações usam o Durable Task Framework extensivamente para automatizar processos de missão crítica. É um ajuste natural para o ambiente do Azure Functions sem servidor.
Restrições de código
A fim de fornecer garantias de execução confiáveis e de longa duração, as funções do orquestrador têm um conjunto de regras de codificação que devem ser seguidas. Para obter mais informações, consulte o artigo Restrições de código de função do Orchestrator.
Faturação
As Funções Duráveis são cobradas da mesma forma que as Funções do Azure. Para obter mais informações, consulte Preços do Azure Functions. Ao executar funções de orquestrador no plano de Consumo do Azure Functions, há alguns comportamentos de cobrança a serem observados. Para obter mais informações sobre esses comportamentos, consulte o artigo Faturamento de funções duráveis.
Saltar para a direita
Você pode começar a usar as Funções Duráveis em menos de 10 minutos concluindo um destes tutoriais de início rápido específicos do idioma:
- C# usando o Visual Studio 2019
- JavaScript usando o Visual Studio Code
- TypeScript usando o Visual Studio Code
- Python usando o Visual Studio Code
- PowerShell usando o Visual Studio Code
- Java usando Maven
Nesses inícios rápidos, você cria e testa localmente uma função durável "hello world". Em seguida, publique o código de função no Azure. A função que você cria orquestra e encadeia chamadas para outras funções.
Publicações
Durable Functions é desenvolvido em colaboração com a Microsoft Research. Como resultado, a equipe de Funções Duráveis produz ativamente artigos de pesquisa e artefatos; Estes incluem:
- Funções duráveis: semântica para Stateful Serverless (OOPSLA'21)
- Fluxos de trabalho sem servidor com funções duráveis e Netherite (pré-impressão)
Mais informações
O vídeo a seguir destaca os benefícios das funções duráveis:
Como o Durable Functions é uma extensão avançada para o Azure Functions, ele não é apropriado para todos os aplicativos. Para obter uma comparação com outras tecnologias de orquestração do Azure, consulte Comparar o Azure Functions e os Aplicativos Lógicos do Azure.