Compartir vía


¿Qué es Durable Functions?

Durable Functions es una característica de Azure Functions que permite escribir funciones con estado en un entorno de proceso sin servidor. La extensión permite definir flujos de trabajo con estado mediante la escritura de funciones del orquestador y entidades con estado mediante la escritura de funciones de entidad con el modelo de programación de Azure Functions. En segundo plano, la extensión administra automáticamente el estado, los puntos de comprobación y los reinicios, lo que le permite centrarse en la lógica de negocios.

Idiomas compatibles

Durable Functions está diseñado para trabajar con todos los lenguajes de programación Azure Functions, pero puede tener requisitos mínimos diferentes para cada uno. En la tabla siguiente se muestran las configuraciones de aplicación mínimas admitidas:

Pila de lenguajes Versiones en tiempo de ejecución de Azure Functions Versión de trabajo del lenguaje Versión mínima de paquetes
.NET/C#/F# Funciones 1.0+ En proceso
Fuera de proceso
n/a
JavaScript/TypeScript (modelo v3). Funciones 2.0+ Node 8+ Conjuntos 2.x
JavaScript/TypeScript (modelo v4). Funciones 4.25+ Nodo 18 3.15+ agrupaciones
Python Funciones 2.0+ Python 3.7+ Conjuntos 2.x
Python (modelo de programación v2) Funciones 4.0+ Python 3.7+ 3.15+ agrupaciones
PowerShell Funciones 3.0+ PowerShell 7+ Conjuntos 2.x
Java Funciones 4.0+ Java 8+ Conjuntos 4.x

Importante

En este artículo se usan pestañas para admitir varias versiones del modelo de programación de Node.js. El modelo v4 está disponible de forma general y está diseñado para que los desarrolladores de JavaScript y TypeScript tengan una experiencia más flexible e intuitiva. Para más detalles acerca de cómo funciona el modelo v4, consulte la Guía para desarrolladores de Node.js de Azure Functions. Para obtener más información acerca de las diferencias entre v3 y v4, consulte la Guía de migración.

Importante

En este artículo se usan pestañas para admitir varias versiones del modelo de programación de Python. El modelo v2 está disponible con carácter general y está diseñado para proporcionar una forma más centrada en el código para crear funciones a través de decoradores. Para más información sobre el modelo v2, consulte la guía para desarrolladores de Python de Azure Functions.

Al igual que con Azure Functions, existen plantillas que le ayudarán a desarrollar Durable Functions mediante Visual Studio, Visual Studio Code y Azure Portal.

Patrones de aplicación

El caso de uso principal de Durable Functions es simplificar los complejos requisitos de gestión de estado en aplicaciones sin servidor. Las siguientes secciones describen patrones de aplicación típicos que se pueden beneficiar de Durable Functions:

Patrón nº 1: Encadenamiento de funciones

En el modelo de encadenamiento de funciones, una secuencia de funciones se ejecuta en un orden específico. Con este patrón, la salida de una función se aplica a la entrada de otra función. El uso de colas entre cada función garantiza que el sistema permanezca duradero y escalable, aunque haya un flujo de control de una función a la siguiente.

Diagrama del patrón de encadenamiento de funciones.

Puede utilizar Durable Functions para implementar el patrón de encadenamiento de funciones de forma concisa, como se muestra en el siguiente ejemplo.

En este ejemplo, los valores F1, F2, F3 y F4 son los nombres de otras funciones de la aplicación de funciones. Puede implementar el flujo de control mediante construcciones de código imperativas normales. El código se ejecuta de arriba hacia abajo. El código puede implicar la semántica del flujo de control de lenguaje existente, como condicionales y bucles. Puede incluir lógica de control de errores en bloques try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Puede usar el parámetro context para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. Cada vez que el código llama a await, el marco de Durable Functions establece puntos de control del progreso de la instancia actual de la función. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de función se reanuda desde la llamada al await anterior. Para obtener más información, vea la sección siguiente, Patrón #2: Fan-out/fan-in.

En este ejemplo, los valores F1, F2, F3 y F4 son los nombres de otras funciones de la aplicación de funciones. Puede implementar el flujo de control mediante construcciones de código imperativas normales. El código se ejecuta de arriba hacia abajo. El código puede implicar la semántica de flujo de control del lenguaje existente, como instrucciones condicionales y bucles. Puede incluir lógica de control de errores en bloques try/catch/finally.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Puede usar el objeto context.df para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. Cada vez que el código llama a yield, el marco de Durable Functions establece puntos de control del progreso de la instancia actual de la función. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de función se reanuda desde la llamada al yield anterior. Para más información, consulte la siguiente sección, Patrón nº 2: Fan-out/fan-in.

Nota

En JavaScript, el objeto context representa todo el contexto de la función. Acceda al contexto de Durable Functions mediante la propiedad df en el contexto principal.

En este ejemplo, los valores F1, F2, F3 y F4 son los nombres de otras funciones de la aplicación de funciones. Puede implementar el flujo de control mediante construcciones de código imperativas normales. El código se ejecuta de arriba hacia abajo. El código puede implicar la semántica de flujo de control del lenguaje ya existente, como condiciones y bucles.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

Puede usar el objeto context para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. Cada vez que el código llama a yield, el marco de Durable Functions establece puntos de control del progreso de la instancia actual de la función. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de función se reanuda desde la llamada al yield anterior. Para más información, consulte la siguiente sección, Patrón nº 2: Fan-out/fan-in.

Nota

El objeto context en Python representa el contexto de la orquestación. Acceda al contexto principal de Azure Functions mediante la propiedad function_context del contexto de orquestación.

En este ejemplo, los valores F1, F2, F3 y F4 son los nombres de otras funciones de la aplicación de funciones. Puede implementar el flujo de control mediante construcciones de código imperativas normales. El código se ejecuta de arriba hacia abajo. El código puede implicar la semántica del flujo de control de lenguaje existente, como condicionales y bucles.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Puede usar el comando Invoke-DurableActivity para invocar otras funciones por nombre, pasar parámetros y devolver la salida de la función. Cada vez que el código llama a Invoke-DurableActivity sin el modificador NoWait, el marco Durable Functions controla el progreso de la instancia de función actual. Si el proceso o la máquina virtual se reciclan a mitad de la ejecución, la instancia de función se reanuda desde la llamada al Invoke-DurableActivity anterior. Para más información, consulte la siguiente sección, Patrón nº 2: Fan-out/fan-in.

En este ejemplo, los valores F1, F2, F3 y F4 son los nombres de otras funciones de la aplicación de funciones. Puede implementar el flujo de control mediante construcciones de código imperativas normales. El código se ejecuta de arriba hacia abajo. El código puede implicar semántica de flujo de control de lenguajes ya existentes, como instrucciones condicionales y bucles.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Puede usar el objeto ctx para invocar otras funciones por nombre, pasar parámetros y devolver la salida de una función. La salida de estos métodos es un objeto Task<V> donde V es el tipo de dato devuelto por la función invocada. Cada vez que llamas a Task<V>.await(), el marco de trabajo de Durable Functions registra el progreso de la instancia actual de la función. Si el proceso se recicla inesperadamente a mitad de la ejecución, la instancia de la función se reanuda desde la llamada Task<V>.await() anterior. Para más información, consulte la siguiente sección, Patrón nº 2: Fan-out/fan-in.

Patrón nº 2: Distribución de salida/fan-in

El patrón fan-out/fan-in consiste en ejecutar varias funciones en paralelo y después esperar a que todas finalicen. A menudo se realiza algún trabajo de agregación en los resultados devueltos de las funciones.

Diagrama del patrón de distribución ramificada de distribución ramificada.

Con las funciones normales, puedes realizar la distribución ramificada de salida al hacer que la función envíe varios mensajes a una cola. Fanning back in es mucho más difícil. Para realizar el ventilador, en una función normal, se escribe código para realizar un seguimiento cuando finalizan las funciones desencadenadas por la cola y, a continuación, se almacenan las salidas de la función.

La extensión de Durable Functions controla este patrón con código relativamente sencillo:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

El trabajo en fan-out se distribuye en varias instancias de la función F2. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Task.WhenAll se invoca para esperar a que todas las funciones llamadas terminen. Después, los resultados de la función F2 se agregan desde la lista de tareas dinámica y se pasan a la función F3.

El control automático de puntos de control que se produce en la awaitllamada en Task.WhenAll garantiza que un posible bloqueo o reinicio de medio camino no requiera reiniciar una tarea ya completada.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

El trabajo en fan-out se distribuye en varias instancias de la función F2. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. se llama acontext.df.Task.all API para esperar a que finalicen todas las funciones llamadas. Después, los resultados de la función F2 se agregan desde la lista de tareas dinámica y se pasan a la función F3.

El control automático de puntos de control que se produce en la yieldllamada en context.df.Task.all garantiza que un posible bloqueo o reinicio de medio camino no requiera reiniciar una tarea ya completada.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

El trabajo en fan-out se distribuye en varias instancias de la función F2. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Se llama a la API context.task_all para esperar a que todas las funciones invocadas finalicen. Después, los resultados de la función F2 se agregan desde la lista de tareas dinámica y se pasan a la función F3.

El control automático de puntos de control que se produce en la yieldllamada en context.task_all garantiza que un posible bloqueo o reinicio de medio camino no requiera reiniciar una tarea ya completada.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

El trabajo en fan-out se distribuye en varias instancias de la función F2. Observe el uso del modificador NoWait en la invocación de la función F2: este modificador permite que el orquestador invoque a F2 sin esperar a que finalice la actividad. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. Se llama al comando Wait-ActivityFunction para esperar a que finalicen todas las funciones llamadas. Después, los resultados de la función F2 se agregan desde la lista de tareas dinámica y se pasan a la función F3.

La creación automática de puntos de control que se produce en la llamada a Wait-ActivityFunction garantiza que cualquier posible bloqueo o reinicio en mitad del proceso no requiera que se reinicien las tareas ya completadas.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

El trabajo en fan-out se distribuye en varias instancias de la función F2. Se realiza un seguimiento del trabajo mediante una lista dinámica de las tareas. ctx.allOf(parallelTasks).await() se llama a para esperar a que finalicen todas las funciones llamadas. A continuación, las salidas de la función F2 se agregan a partir de la lista de tareas dinámica y se devuelven como salida de la función del orquestador.

La comprobación automática que se produce en .await() llamada a ctx.allOf(parallelTasks) garantiza que un reciclaje inesperado del proceso no requiera el reinicio de ninguna tarea ya completada.

Nota

En raras circunstancias, es posible que se produzca un bloqueo en la ventana después de que se complete una función de actividad, pero antes de que se guarde su finalización en el historial de orquestaciones. Si esto sucede, la función de actividad se volvería a ejecutar desde el principio después de que se recupere el proceso.

Patrón nº 3: API HTTP asincrónicas

El patrón de API HTTP asincrónico aborda el problema de coordinar el estado de las operaciones de ejecución prolongada con clientes externos. Una forma habitual de implementar este patrón es que un punto de conexión HTTP desencadene la acción de larga duración. A continuación, redirija el cliente a un punto de conexión de estado que sondea el cliente para obtener información cuando finalice la operación.

Diagrama que muestra el patrón de API HTTP.

Durable Functions proporciona compatibilidad integrada con este patrón, lo que simplifica, o incluso elimina, el código que hay que escribir para interactuar con ejecuciones de funciones de larga duración. Por ejemplo, los ejemplos que se incluyen en el inicio rápido de Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell y Java) muestran un comando de REST simple que se puede usar para iniciar nuevas instancias de funciones del orquestador. Tras iniciar una instancia, la extensión expone las API de HTTP de webhook que consultan el estado de la función del orquestador.

En el ejemplo siguiente se muestran los comandos REST para iniciar un orquestador y consultar su estado. Para que el ejemplo se vea con mayor claridad se omiten algunos detalles del protocolo.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Dado que el entorno en tiempo de ejecución de Durable Functions administra el estado, no es necesario que implemente su propio mecanismo de seguimiento del estado.

La extensión Durable Functions expone las API HTTP integradas que administran orquestaciones de larga duración. También puede implementar este patrón mediante sus propios desencadenadores de función (como HTTP, una cola o Azure Event Hubs) y el enlace de cliente duradero. Por ejemplo, puede usar un mensaje de cola para desencadenar la terminación. O bien, puede usar un desencadenador de HTTP que esté protegido por una directiva de autenticación de Microsoft Entra, en lugar de las API HTTP integradas que usan una clave generada para la autenticación.

Para más información, consulte el artículo acerca de las características de HTTP, en el que se explica cómo se pueden exponer procesos asincrónicos de larga duración a través de HTTP con la extensión Durable Functions.

Patrón nº 4: Supervisión

El patrón de supervisión hace referencia a un proceso flexible y periódico en un flujo de trabajo. Un ejemplo es el sondeo hasta que se cumplan condiciones específicas. Puede usar un desencadenador de temporizador normal para solucionar un escenario simple (como un trabajo de limpieza periódico), pero el intervalo es estático y resulta más difícil administrar los ciclos de vida de las instancias. Puede usar Durable Functions para crear intervalos de periodicidad flexible, administrar el ciclo de vida de las tareas y crear varios procesos de supervisión a partir de una única orquestación.

Un ejemplo del patrón de monitoreo es invertir el escenario previo de la API HTTP asincrónica. En lugar de exponer un punto de conexión para que un cliente externo supervise una operación de larga duración, el monitor de larga duración consume un punto de conexión externo y luego espera un cambio de estado.

Diagrama que muestra el patrón de supervisión.

Con unas cuantas líneas de código, puede utilizar Durable Functions para crear varios monitores que observen puntos de conexión arbitrarios. Los monitores pueden finalizar la ejecución cuando se cumple una condición u otra función puede usar el cliente de orquestación durable para terminar los monitores. Puede cambiar el intervalo de wait del monitor según una condición específica (por ejemplo, retroceso exponencial).

El código siguiente implementa un monitor básico:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Cuando se recibe una solicitud, se crea una nueva instancia de orquestación para ese identificador de trabajo. La instancia sondea un estado hasta que se cumple una condición o hasta que expire un tiempo de espera. Un temporizador durable controla el intervalo de sondeo. Después, se puede realizar más trabajo o finalizar la orquestación.

Patrón nº 5: Interacción humana

Muchos procesos automatizados implican algún tipo de interacción humana. La intervención humana en un proceso automatizado es más difícil, ya que las personas no tienen la misma alta disponibilidad y capacidad de respuesta que los servicios en la nube. Un proceso automatizado podría permitir esta interacción mediante tiempos de espera y lógica de compensación.

Un proceso de aprobación es un ejemplo de un proceso empresarial que implica la interacción humana. Para un informe de gastos que supera un importe en dólares determinado puede ser necesaria la aprobación de un administrador. Si el administrador no aprueba el informe de gastos en un plazo de 72 horas (puede estar de vacaciones), se inicia un proceso de escalado para obtener la aprobación de otra persona (por ejemplo el jefe del administrador).

Diagrama del patrón de interacción humana.

Puede implementar el patrón en este ejemplo mediante una función de orquestador. El orquestador usa un temporizador durable para solicitar la aprobación, El orquestador se escala si se agota el tiempo de espera. El orquestador espera a que ocurra un evento externo, como una notificación generada por interacción humana.

Estos ejemplos crean un proceso de aprobación para hacer una demostración del patrón de interacción humana:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Para crear el temporizador durable, llame a context.CreateTimer. context.WaitForExternalEvent recibe la notificación. A continuación, Task.WhenAny se llama a para decidir si se debe escalar (el tiempo de espera se produce primero) o procesar la aprobación (la aprobación se recibe antes de que se agote el tiempo de espera).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Para crear el temporizador durable, llame a context.df.createTimer. context.df.waitForExternalEvent recibe la notificación. A continuación, context.df.Task.any se llama a para decidir si se debe escalar (el tiempo de espera se produce primero) o procesar la aprobación (la aprobación se recibe antes de que se agote el tiempo de espera).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Para crear el temporizador durable, llame a context.create_timer. context.wait_for_external_event recibe la notificación. A continuación, context.task_any se llama a para decidir si se debe escalar (el tiempo de espera se produce primero) o procesar la aprobación (la aprobación se recibe antes de que se agote el tiempo de espera).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Para crear el temporizador durable, llame a Start-DurableTimer. Start-DurableExternalEventListener recibe la notificación. A continuación, Wait-DurableTask se llama a para decidir si se debe escalar (el tiempo de espera se produce primero) o procesar la aprobación (la aprobación se recibe antes de que se agote el tiempo de espera).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

La llamada de método ctx.waitForExternalEvent(...).await() pone en pausa la orquestación hasta que recibe un evento llamado ApprovalEvent, que tiene una carga útil boolean. Si se recibe el evento, se llama a una función de actividad para procesar el resultado de la aprobación. Sin embargo, si no se recibe ningún evento antes de que timeout expire (72 horas), se genera un TaskCanceledException y se llama a la Escalate función de actividad.

Nota

No hay ningún cargo por el tiempo invertido en esperar eventos externos cuando se ejecuta en el plan de consumo.

Un cliente externo puede enviar la notificación de eventos a una función de orquestador en espera mediante las API HTTP integradas:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

También se pueden generar eventos mediante el cliente de orquestación durable desde otra función de la misma aplicación de funciones:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Patrón nº 6: agregador (entidades con estado)

El sexto patrón se trata de agregar datos de eventos durante un período de tiempo en una solaentidad direccionable. En este patrón, los datos que se agregan pueden provenir de varios orígenes, pueden entregarse en lotes o pueden estar dispersos durante largos períodos de tiempo. Es posible que el agregador tome medidas según datos de eventos a medida que llegan, y puede que los clientes externos necesiten consultar los datos agregados.

Diagrama que muestra un agregador.

El aspecto más difícil de implementar este patrón con funciones normales sin estado es que el control de simultaneidad se convierte en un gran desafío. No solo debe preocuparse de que varios subprocesos modifiquen los mismos datos al mismo tiempo, también debe preocuparse por asegurarse de que el agregador solo se ejecuta en una sola máquina virtual a la vez.

Puede usar entidades duraderas para implementar fácilmente este patrón como una sola función.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Las entidades duraderas también pueden modelarse como clases en .NET. Este modelo puede ser útil si la lista de operaciones es fija y de gran tamaño. El ejemplo siguiente es una implementación equivalente de la entidad Counter que usa clases y métodos .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Nota

Actualmente, las entidades duraderas no se admiten en PowerShell.

Nota

Las entidades duraderas no son compatibles actualmente con Java.

Los clientes pueden poner en cola operaciones para (también conocidos como señalización) una función de entidad mediante el enlace de cliente de entidad.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Nota

Los servidores proxy generados dinámicamente también están disponibles en .NET para indicar entidades de forma segura. Además de la señalización, los clientes también pueden consultar el estado de una función de entidad mediante métodos seguros para tipos en el enlace de cliente de orquestación.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Las funciones de entidad están disponibles en Durable Functions 2.0 y versiones posteriores para C#, JavaScript y Python.

La tecnología

En segundo plano, la extensión Durable Functions se crea con Durable Task Framework, que es una biblioteca de código abierto que se encuentra en GitHub y se usa para crear flujos de trabajo en el código. Así como Azure Functions es la evolución sin servidor de Azure WebJobs, Durable Functions es la evolución sin servidor de Durable Task Framework. Microsoft y otras organizaciones utilizan Durable Task Framework ampliamente para automatizar los procesos críticos. Es una opción natural para el entorno de Azure Functions sin servidor.

Restricciones del código

Con el fin de proporcionar garantías de ejecución prolongada y confiable, las funciones del orquestador tienen un conjunto de reglas de codificación que deben seguirse. Para más información, consulte el Restricciones de código de las funciones de orquestador.

Facturación

Durable Functions se factura igual que Azure Functions. Para más información, consulte los precios de Azure Functions. Al ejecutar funciones de orquestador en el plan de consumo de Azure Functions, hay algunos comportamientos de facturación que se deben tener en cuenta. Para más información acerca de estos comportamientos, consulte el artículo en el que se explica la facturación de Durable Functions.

Saltar hacia la derecha

Puede empezar a trabajar con Durable Functions en menos de 10 minutos completando uno de estos tutoriales de inicio rápido específicos del idioma:

En estas guías de inicio rápido, creará y probará localmente un Hola mundo función durable. Luego, publicará el código de función en Azure. La función que crea organiza y encadena en secuencia llamadas a otras funciones.

Publicaciones

Durable Functions se ha desarrollado en colaboración con Microsoft Research. Como resultado, el equipo de Durable Functions genera activamente documentos y artefactos de investigación; entre estos, se incluyen:

Demostración de vídeo

El siguiente vídeo resalta las ventajas de Durable Functions:

Otras opciones de orquestación

Durable Functions es una extensión avanzada para Azure Functions y es posible que no sea adecuado para todas las aplicaciones. Para obtener una comparación con otras tecnologías de orquestación de Azure, consulte Comparativa entre Azure Functions y Azure Logic Apps.