¿Qué es Durable Functions?
Durable Functions es una característica de Azure Functions que permite escribir funciones con estado en un entorno de proceso sin servidor. La extensión permite definir flujos de trabajo con estado mediante la escritura de funciones del orquestador y entidades con estado mediante la escritura de funciones de entidad con el modelo de programación de Azure Functions. En segundo plano, la extensión administra automáticamente el estado, los puntos de comprobación y los reinicios, lo que le permite centrarse en la lógica de negocios.
Idiomas compatibles
Durable Functions está diseñado para trabajar con todos los lenguajes de programación Azure Functions, pero puede tener requisitos mínimos diferentes para cada uno. En la tabla siguiente se muestran las configuraciones de aplicación mínimas admitidas:
Pila de lenguajes | Versiones de Azure Functions Runtime | Versión de trabajo del lenguaje | Versión mínima de conjunto |
---|---|---|---|
.NET/C#/F# | Functions 1.0 | En proceso Fuera de proceso |
N/D |
JavaScript/TypeScript (modelo de programación V3) | Functions 2.0 | Node 8+ | Conjuntos 2.x. |
JavaScript/TypeScript (modelo de programación V4) | Functions 4.25+ | Nodo 18 | 3.15+ agrupaciones |
Python | Functions 2.0 | Python 3.7+ | Conjuntos 2.x. |
Python (modelo de prog. V2) | Functions 4.0 | Python 3.7+ | 3.15+ agrupaciones |
PowerShell | Functions 3.0+ | PowerShell 7+ | Conjuntos 2.x. |
Java | Functions 4.0 | Java 8+ | Conjuntos 4.x |
Importante
En este artículo se usan pestañas para admitir varias versiones del modelo de programación de Node.js. El modelo v4 está disponible de forma general y está diseñado para que los desarrolladores de JavaScript y TypeScript tengan una experiencia más flexible e intuitiva. Para más detalles acerca de cómo funciona el modelo v4, consulte la Guía para desarrolladores de Node.js de Azure Functions. Para obtener más información acerca de las diferencias entre v3 y v4, consulte la Guía de migración.
Importante
En este artículo se usan pestañas para admitir varias versiones del modelo de programación de Python. El modelo v2 está disponible con carácter general y está diseñado para proporcionar una forma más centrada en el código para crear funciones a través de decoradores. Para más información sobre cómo funciona el modelo v2, consulte la Guía para desarrolladores de Python de Azure Functions.
Al igual que con Azure Functions, existen plantillas que le ayudarán a desarrollar Durable Functions mediante Visual Studio, Visual Studio Code y Azure Portal.
Patrones de aplicación
El caso de uso principal para Durable Functions es simplificar los requisitos de coordinación con estado complejos en las aplicaciones sin servidor. Las siguientes secciones describen patrones de aplicación típicos que se pueden beneficiar de Durable Functions:
- Encadenamiento de funciones
- Distribución ramificada de salida y de entrada
- API de HTTP asincrónico
- Supervisión
- Interacción humana
- Agregador (entidades con estado)
Patrón nº 1: Encadenamiento de funciones
En el modelo de encadenamiento de funciones, una secuencia de funciones se ejecuta en un orden específico. Con este patrón, la salida de una función se aplica a la entrada de otra función. El uso de colas entre cada función garantiza el mantenimiento de la durabilidad y escalabilidad del sistema, aunque haya un flujo de control de una función a la siguiente.
Puede utilizar Durable Functions para implementar el patrón de encadenamiento de funciones de forma concisa, como se muestra en el siguiente ejemplo.
En este ejemplo, los valores F1
, F2
, F3
y F4
son los nombres de otras funciones de la aplicación de funciones. Puede implementar el flujo de control mediante construcciones de código imperativas normales. El código se ejecuta de arriba hacia abajo. y puede implicar semántica de flujo de control del lenguaje ya existente, como instrucciones condicionales y bucles. Puede incluir lógica de control de errores en bloques try
/catch
/finally
.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Puede usar el parámetro context
para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. Cada vez que el código llama a await
, el marco de Durable Functions establece puntos de control del progreso de la instancia actual de la función. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de la función se reanuda desde la llamada a await
anterior. Para obtener más información, consulte la siguiente sección, Patrón 2: Distribución ramificada de entrada y salida.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
Puede usar el objeto context.df
para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. Cada vez que el código llama a yield
, el marco de Durable Functions establece puntos de control del progreso de la instancia actual de la función. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de la función se reanuda desde la llamada a yield
anterior. Para obtener más información, consulte la siguiente sección, Patrón 2: Distribución ramificada de entrada y salida.
Nota
En JavaScript, el objeto context
representa todo el contexto de la función. Acceda al contexto de Durable Functions mediante la propiedad df
en el contexto principal.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Puede usar el objeto context
para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. Cada vez que el código llama a yield
, el marco de Durable Functions establece puntos de control del progreso de la instancia actual de la función. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de la función se reanuda desde la llamada a yield
anterior. Para obtener más información, consulte la siguiente sección, Patrón 2: Distribución ramificada de entrada y salida.
Nota
El objeto context
en Python representa el contexto de la orquestación. Acceda al contexto de Azure Functions principal mediante la propiedad function_context
en el contexto de la orquestación.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
Puede usar el comando Invoke-DurableActivity
para invocar otras funciones por nombre, pasar parámetros y devolver la salida de la función. Cada vez que el código llama a Invoke-DurableActivity
sin el modificador NoWait
, el marco de Durable Functions establece un punto de control en el progreso de la instancia actual de la función. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de la función se reanuda desde la llamada a Invoke-DurableActivity
anterior. Para obtener más información, consulte la siguiente sección, Patrón 2: Distribución ramificada de entrada y salida.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Puede usar el objeto ctx
para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. La salida de estos métodos es un objeto Task<V>
donde V
es el tipo de dato devuelto por la función invocada. Cada vez que se llama a Task<V>.await()
, el marco de Durable Functions comprueba el progreso de la instancia de la función actual. Si el proceso se recicla inesperadamente a mitad de la ejecución, la instancia de la función se reanuda desde la llamada Task<V>.await()
anterior. Para obtener más información, consulte la siguiente sección, Patrón 2: Distribución ramificada de entrada y salida.
Patrón nº 2: Distribución ramificada de salida y de entrada
En el patrón de distribución ramificada de salida y entrada, se ejecutan en paralelo varias funciones y después se espera a que todas finalicen. A menudo se realiza algún trabajo de agregación en los resultados devueltos de las funciones.
Con las funciones normales, puedes realizar la distribución ramificada de salida al hacer que la función envíe varios mensajes a una cola. La distribución ramificada de entrada es mucho más compleja. Para realizarla en una función normal, escribe código para realizar un seguimiento de cuándo finalizan las funciones desencadenadas por la cola y después almacenar la salida de la función.
La extensión de Durable Functions controla este patrón con código relativamente sencillo:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
El trabajo de distribución ramificada se distribuye en varias instancias de la función F2
. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Se llama a la Task.WhenAll
para esperar a que todas las funciones llamadas finalicen. Después, los resultados de la función F2
se agregan desde la lista de tareas dinámica y se pasan a la función F3
.
La creación automática de puntos de control que se produce en la llamada a await
en Task.WhenAll
garantiza que cualquier posible bloqueo o reinicio a mitad del proceso no requiera que se reinicien las tareas ya completadas.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
El trabajo de distribución ramificada se distribuye en varias instancias de la función F2
. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Se llama a context.df.Task.all
API para esperar a que todas las funciones llamadas finalicen. Después, los resultados de la función F2
se agregan desde la lista de tareas dinámica y se pasan a la función F3
.
La creación automática de puntos de control que se produce en la llamada a yield
en context.df.Task.all
garantiza que cualquier posible bloqueo o reinicio a mitad del proceso no requiera que se reinicien las tareas ya completadas.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
El trabajo de distribución ramificada se distribuye en varias instancias de la función F2
. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Se llama a context.task_all
API para esperar a que todas las funciones llamadas finalicen. Después, los resultados de la función F2
se agregan desde la lista de tareas dinámica y se pasan a la función F3
.
La creación automática de puntos de control que se produce en la llamada a yield
en context.task_all
garantiza que cualquier posible bloqueo o reinicio a mitad del proceso no requiera que se reinicien las tareas ya completadas.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
El trabajo de distribución ramificada se distribuye en varias instancias de la función F2
. Observe el uso del modificador NoWait
en la invocación de la función F2
: este modificador permite que el orquestador invoque a F2
sin esperar a que finalice la actividad. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Se llama al comando Wait-ActivityFunction
para esperar a que finalicen todas las funciones llamadas. Después, los resultados de la función F2
se agregan desde la lista de tareas dinámica y se pasan a la función F3
.
La creación automática de puntos de control que se produce en la llamada a Wait-ActivityFunction
garantiza que cualquier posible bloqueo o reinicio en mitad del proceso no requiera que se reinicien las tareas ya completadas.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
El trabajo de distribución ramificada se distribuye en varias instancias de la función F2
. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Se llama a la ctx.allOf(parallelTasks).await()
para esperar a que todas las funciones llamadas finalicen. A continuación, las salidas de la función F2
se agregan a partir de la lista de tareas dinámica y se devuelven como salida de la función del orquestador.
La comprobación automática que se produce en .await()
llamada a ctx.allOf(parallelTasks)
garantiza que un reciclaje inesperado del proceso no requiera el reinicio de ninguna tarea ya completada.
Nota
En raras ocasiones es posible que se produzca un bloqueo en la ventana después de que se complete una función de actividad, pero antes de que su finalización se guarde en el historial de la orquestación. Em ese caso, la función de actividad se volvería a ejecutar desde el principio en cuanto se recuperara el proceso.
Patrón nº 3: Las API de HTTP asincrónico
El patrón de las API HTTP asincrónico soluciona el problema de coordinar el estado de las operaciones de larga duración con los clientes externos. Una forma habitual de implementar este patrón es que un punto de conexión HTTP desencadene la acción de larga duración. A continuación, el cliente se redirige a un punto de conexión de estado al que sondea para saber cuando finalice la operación.
Durable Functions proporciona compatibilidad integrada con este patrón, lo que simplifica, o incluso elimina, el código que hay que escribir para interactuar con ejecuciones de funciones de larga duración. Por ejemplo, los ejemplos que se incluyen en el inicio rápido de Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell y Java) muestran un comando de REST simple que se puede usar para iniciar nuevas instancias de funciones del orquestador. Tras iniciar una instancia, la extensión expone las API de HTTP de webhook que consultan el estado de la función del orquestador.
En el ejemplo siguiente se muestran los comandos REST para iniciar un orquestador y consultar su estado. Para que el ejemplo se vea con mayor claridad se omiten algunos detalles del protocolo.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Dado que el entorno en tiempo de ejecución de Durable Functions administra el estado, no es necesario que implemente su propio mecanismo de seguimiento del estado.
La extensión Durable Functions expone las API HTTP integradas que administran orquestaciones de larga duración. Este patrón también lo puede implementar usted mismo si utiliza sus propios desencadenadores de funciones (por ejemplo, HTTP, una cola o Azure Event Hubs) y el enlace del cliente de Durable. Por ejemplo, puede utilizar un mensaje de cola para desencadenar la terminación. O bien, puede usar un desencadenador de HTTP que esté protegido por una directiva de autenticación de Microsoft Entra, en lugar de las API HTTP integradas que usan una clave generada para la autenticación.
Para más información, consulte el artículo acerca de las características de HTTP, en el que se explica cómo se pueden exponer procesos asincrónicos de larga duración a través de HTTP con la extensión Durable Functions.
Patrón 4: Supervisión
El patrón de supervisión hace referencia a un proceso flexible y periódico en un flujo de trabajo. Un ejemplo es el sondeo hasta que se cumplen condiciones específicas. Puede usar un desencadenador de temporizador normal para solucionar un escenario simple (como un trabajo de limpieza periódico), pero el intervalo es estático y resulta más difícil administrar los ciclos de vida de las instancias. Puede usar Durable Functions para crear intervalos de periodicidad flexible, administrar el ciclo de vida de las tareas y crear varios procesos de supervisión a partir de una única orquestación.
Un ejemplo del patrón de supervisión es invertir el escenario anterior de la API de HTTP asincrónica. En lugar de exponer un punto de conexión para que un cliente externo supervise una operación de larga duración, el monitor de larga duración consume el punto de conexión externo y espera algún cambio de estado.
Con unas cuantas líneas de código, puede utilizar Durable Functions para crear varios monitores que observen puntos de conexión arbitrarios. Los monitores pueden finalizar la ejecución cuando se cumple una condición u otra función puede usar el cliente de orquestación durable para terminar los monitores. Puede cambiar el intervalo de wait
del monitor según una condición específica (por ejemplo, retroceso exponencial).
El código siguiente implementa un monitor básico:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Cuando se recibe una solicitud, se crea una nueva instancia de orquestación para ese identificador de trabajo. La instancia sondea un estado hasta que se cumple una condición o hasta que expira un tiempo de espera. Un temporizador durable controla el intervalo de sondeo. Después, se puede realizar trabajo adicional o puede finalizar la orquestación.
Patrón nº 5: Interacción humana
Muchos procesos automatizados implican algún tipo de interacción humana. La intervención humana en un proceso automatizado es más difícil, ya que las personas no tienen la misma alta disponibilidad y capacidad de respuesta que los servicios en la nube. Los procesos automatizados pueden permitir esta interacción mediante el uso de tiempos de expiración y la lógica de compensación.
Un proceso de aprobación es un ejemplo de un proceso empresarial que implica la interacción humana. Para un informe de gastos que supera un importe en dólares determinado puede ser necesaria la aprobación de un administrador. Si el administrador no aprueba el informe de gastos en un plazo de 72 horas (puede estar de vacaciones), se inicia un proceso de escalado para obtener la aprobación de otra persona (por ejemplo el jefe del administrador).
Puede implementar el patrón en este ejemplo mediante una función de orquestador. El orquestador usa un temporizador durable para solicitar la aprobación, y la escala si se agota el tiempo de espera. El orquestador espera a que ocurra un evento externo, como una notificación generada por interacción humana.
Estos ejemplos crean un proceso de aprobación para hacer una demostración del patrón de interacción humana:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
Para crear el temporizador durable, llame a context.CreateTimer
. context.WaitForExternalEvent
recibe la notificación. Después, se realiza una llamada a Task.WhenAny
para decidir si la aprobación se escala (primero se agota el tiempo de expiración) o se procesa (la aprobación antes se recibe de que se agote el tiempo de expiración).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
Para crear el temporizador durable, llame a context.df.createTimer
. context.df.waitForExternalEvent
recibe la notificación. Después, se realiza una llamada a context.df.Task.any
para decidir si la aprobación se escala (primero se agota el tiempo de expiración) o se procesa (la aprobación antes se recibe de que se agote el tiempo de expiración).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
Para crear el temporizador durable, llame a context.create_timer
. context.wait_for_external_event
recibe la notificación. Después, se realiza una llamada a context.task_any
para decidir si la aprobación se escala (primero se agota el tiempo de expiración) o se procesa (la aprobación antes se recibe de que se agote el tiempo de expiración).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
Para crear el temporizador durable, llame a Start-DurableTimer
. Start-DurableExternalEventListener
recibe la notificación. Después, se realiza una llamada a Wait-DurableTask
para decidir si la aprobación se escala (primero se agota el tiempo de expiración) o se procesa (la aprobación antes se recibe de que se agote el tiempo de expiración).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
La llamada de método ctx.waitForExternalEvent(...).await()
pone en pausa la orquestación hasta que recibe un evento llamado ApprovalEvent
, que tiene una carga útil boolean
. Si se recibe el evento, se llama a una función de actividad para procesar el resultado de la aprobación. Sin embargo, si no se recibe dicho evento antes de que expiren timeout
las (72 horas), se levanta un TaskCanceledException
y se llama a la función de actividad Escalate
.
Nota
No hay ningún cargo por el tiempo dedicado a esperar eventos externos al ejecutarse en el plan de consumo.
Un cliente externo puede enviar la notificación de eventos a una función de orquestador en espera mediante las API HTTP integradas:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
También se pueden generar eventos mediante el cliente de orquestación durable desde otra función de la misma aplicación de funciones:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
Patrón 6: Agregador (entidades con estado)
El sexto patrón se trata de agregar datos de eventos durante un período de tiempo en una solaentidad direccionable. En este patrón, los datos que se agreguen pueden proceder de varios orígenes, pueden entregarse en lotes o pueden estar dispersos en largos períodos de tiempo. Es posible que el agregador tome medidas según datos de eventos a medida que llegan, y puede que los clientes externos necesiten consultar los datos agregados.
El aspecto más difícil de implementar este patrón con funciones normales sin estado es que el control de simultaneidad se convierte en un gran desafío. No solo es necesario preocuparse por varios subprocesos que modifican los mismos datos al mismo tiempo, también deberá preocuparse por garantizar que el agregador se ejecuta solo en una única VM a la vez.
Puede usar entidades duraderas para implementar fácilmente este patrón como una sola función.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Las entidades duraderas también pueden modelarse como clases en .NET. Este modelo puede ser útil si la lista de operaciones es fija y de gran tamaño. El ejemplo siguiente es una implementación equivalente de la entidad Counter
que usa clases y métodos .NET.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Nota:
Actualmente, las entidades duraderas no se admiten en PowerShell.
Nota
Las entidades durable no se admiten actualmente en Java.
Los clientes pueden poner en cola las operaciones (también conocidas como "señalización") de una función de entidad mediante el enlace del cliente de la entidad.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Nota:
Los proxies generados dinámicamente también están disponibles en .NET para la señalización de entidades con seguridad de tipos. Y, además de la señalización, los clientes también pueden consultar el estado de cualquier función de entidad mediante métodos con seguridad de tipos en el enlace del cliente de orquestación.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Las funciones de entidad están disponibles en Durable Functions 2.0 y versiones posteriores para C#, JavaScript y Python.
La tecnología
En segundo plano, la extensión Durable Functions se crea con Durable Task Framework, que es una biblioteca de código abierto que se encuentra en GitHub y se usa para crear flujos de trabajo en el código. Así como Azure Functions es la evolución sin servidor de Azure WebJobs, Durable Functions es la evolución sin servidor de Durable Task Framework. Microsoft y otras organizaciones utilizan Durable Task Framework ampliamente para automatizar los procesos críticos. Es una opción natural para el entorno de Azure Functions sin servidor.
Restricciones del código
Con el fin de proporcionar garantías de ejecución prolongada y confiable, las funciones del orquestador tienen un conjunto de reglas de codificación que deben seguirse. Para más información, consulte el artículo acerca de las restricciones del código de las funciones de Orchestrator.
Facturación
Durable Functions se factura igual que Azure Functions. Para más información, consulte los precios de Azure Functions. Al ejecutar funciones de orquestador en el plan de consumo de Azure Functions, hay algunos comportamientos de facturación que deben tenerse en cuenta. Para más información acerca de estos comportamientos, consulte el artículo en el que se explica la facturación de Durable Functions.
Comenzar de inmediato
Puede empezar a trabajar con Durable Functions en menos de 10 minutos completando uno de estos tutoriales de inicio rápido específicos del idioma:
- C# con Visual Studio 2019
- JavaScript con Visual Studio Code
- TypeScript con Visual Studio Code
- Python mediante Visual Studio Code
- PowerShell con Visual Studio Code
- Java con Maven
En estos inicios rápidos, creará y probará localmente una función durable de "hello world". Luego, publicará el código de función en Azure. La función que crea organiza y encadena llamadas a otras funciones.
Publicaciones
Durable Functions se ha desarrollado en colaboración con Microsoft Research. Como resultado, el equipo de Durable Functions genera activamente documentos y artefactos de investigación; entre estos, se incluyen:
- Durable Functions: Semántica para informática sin servidor con estado (OOPSLA'21)
- Flujos de trabajo sin servidor con Durable Functions y Netherite (impresión previa)
Saber más
El siguiente vídeo resalta las ventajas de Durable Functions:
Durable Functions es una extensión avanzada para Azure Functions que no es adecuada para todas las aplicaciones. Para obtener una comparación con otras tecnologías de orquestación de Azure, consulte Comparativa entre Azure Functions y Azure Logic Apps.