Поделиться через


Что такое "Устойчивые функции"?

Устойчивые функции — это функция Функции Azure, которая позволяет создавать функции с отслеживанием состояния в бессерверной вычислительной среде. Расширение позволяет определять рабочие процессы с отслеживанием состояния, создавая функции оркестратора, и сущности с отслеживанием состояния, создавая функции сущностей с помощью модели программирования Функций Azure. В фоновом режиме расширение автоматически управляет состоянием, создает контрольные точки и перезагружается, позволяя вам сосредоточиться на бизнес-логике.

Поддерживаемые языки

Устойчивые функции предназначен для работы со всеми Функции Azure языками программирования, но может иметь разные минимальные требования для каждого языка. В следующей таблице показаны минимальные поддерживаемые конфигурации приложений:

Языковой стек Версии среды выполнения службы "Функции Azure" Версия рабочей роли языка Минимальная версия пакетов
.NET / C# / F# Функции 1.0 и более поздней версии Внутрипроцессный процесс
Внепроцессно
Н/Д
JavaScript/TypeScript (модель V3 prog). Функции 2.0 и более поздней версии Node 8 и более поздней версии Пакеты 2.х
JavaScript/TypeScript (модель V4 prog). Функции 4.25+ Узел 18+ 3.15+ пакеты
Python Функции 2.0 и более поздней версии Python 3.7 и более поздней версии Пакеты 2.х
Python (модель prog версии 2) Функции 4.0+ Python 3.7 и более поздней версии 3.15+ пакеты
PowerShell Функции 3.0 и более поздней версии PowerShell 7 и более поздней версии Пакеты 2.х
Java Функции 4.0+ Java 8 и более поздней версии Пакеты 4.х

Внимание

В этой статье используются вкладки для поддержки нескольких версий модели программирования Node.js. Модель версии 4 общедоступна и предназначена для более гибкого и интуитивно понятного интерфейса для разработчиков JavaScript и TypeScript. Дополнительные сведения о том, как работает модель версии 4, см. в руководстве разработчика по Функции Azure Node.js. Дополнительные сведения о различиях между версиями 3 и 4 см. в руководстве по миграции.

Внимание

В этой статье используются вкладки для поддержки нескольких версий модели программирования Python. Модель версии 2 общедоступна и предназначена для более ориентированного на код способа разработки функций с помощью декораторов. Дополнительные сведения о том, как работает модель версии 2, см. в руководстве разработчика Функции Azure Python.

Как и Функции Azure, существуют шаблоны для разработки Устойчивые функции с помощью Visual Studio, Visual Studio Code и портал Azure.

Шаблоны приложений

Основной вариант использования Устойчивых функций — это упрощение комплексных требований координации с отслеживанием состояния в безсерверных приложениях. В следующих разделах описываются типичные шаблоны приложений, в которых можно эффективно использовать Устойчивые функции.

Шаблон 1. Цепочка функций

В шаблоне цепочки функций последовательность функций выполняется в определенном порядке. В этом шаблоне выходные данные одной функции применяются к входным данным другой. Использование очередей между каждой функцией гарантирует, что система остается устойчивой и масштабируемой, даже если существует поток управления от одной функции к следующей.

Схема шаблона цепочки функций

Устойчивые функции можно использовать для реализации шаблона цепочки функций, как показано в следующем примере.

В этом примере значения F1, F2, F3 и F4 являются именами других функций в том же приложении-функции. Поток управления можно реализовать с помощью обычных принудительных конструкций программирования. Код выполняется сверху вниз. Код может включать в себя имеющуюся семантику языка потока управления, такую ​​как условные обозначения и циклы. Вы можете включить логику обработки ошибок в блоках try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Вы можете использовать параметр context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает await, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова await. Дополнительные сведения см. в следующем разделе: Шаблон #2: выдумка или вентилятор в.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Вы можете использовать объект context.df для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова yield. Дополнительные сведения см. в следующем разделе: Шаблон #2: выдумка или вентилятор в.

Примечание.

Объект context в JavaScript представляет весь контекст функции. Доступ к контексту Устойчивых функций с помощью свойства df в основном контексте.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

Вы можете использовать объект context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова yield. Дополнительные сведения см. в следующем разделе: Шаблон #2: выдумка или вентилятор в.

Примечание.

Объект context в Python представляет контекст оркестрации. Получите доступ к основному контексту Функций Azure, используя свойство function_context в контексте оркестрации.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Вы можете использовать команду Invoke-DurableActivity для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает Invoke-DurableActivity без параметра NoWait, среда Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова Invoke-DurableActivity. Дополнительные сведения см. в следующем разделе: Шаблон #2: выдумка или вентилятор в.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Вы можете использовать объект ctx для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Выходные данные этих методов — это Task<V> объект, в котором V тип данных, возвращаемых вызываемой функцией. Каждый раз, когда вызывается Task<V>.await(), платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс неожиданно перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова Task<V>.await(). Дополнительные сведения см. в следующем разделе: Шаблон #2: выдумка или вентилятор в.

Шаблон #2: выдумка и вентилятор в

В шаблоне развертывания и объединения вы выполняете параллельно несколько функций, а затем ожидаете их завершения. Часто некоторые агрегирования завершаются по результатам, возвращаемым функциями.

Схема шаблона развертывания и объединения

При использовании обычных функций вы можете выполнить развертывание за счет отправки функцией нескольких сообщений в очередь. Войти обратно гораздо сложнее. Чтобы выполнить объединение в обычной функции, напишите код для отслеживания момента, когда функции, активируемые очередью, завершаются, а затем их выходные значения сохраняются.

Расширение "Устойчивые функции" обрабатывает этот шаблон с помощью относительно простого кода:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. Task.WhenAll вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове await к Task.WhenAll, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. context.df.Task.all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове yield к context.df.Task.all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. context.task_all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове yield к context.task_all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Процесс развертывания распределяется по нескольким экземплярам функции F2. Обратите внимание, что параметр NoWait при вызове функции F2 позволяет оркестратору продолжать вызов F2 без ожидания завершения действия. И отслеживается с использованием динамического списка задач. Команда Wait-ActivityFunction вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове Wait-ActivityFunction, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. ctx.allOf(parallelTasks).await() вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из динамического списка задач и возвращаются в виде выходных данных функции оркестратора.

Автоматическое создание контрольных точек, которое происходит при вызове .await() в ctx.allOf(parallelTasks), гарантирует, что неожиданная перезагрузка процесса не потребуют перезапуска уже завершенных задач.

Примечание.

В редких случаях может произойти сбой в окне после завершения функции действия, но до того, как ее выполнение будет сохранено в журнале оркестрации. В этом случае функция действия будет повторно выполнена с начала после восстановления процесса.

Шаблон 3. Асинхронные API-интерфейсы HTTP

Шаблон асинхронного API HTTP решает проблему координации состояния длительных операций с внешними клиентами. Распространенный способ реализации этого шаблона — наличие триггера конечной точки HTTP для выполнения длительного действия. Затем перенаправьте клиент на конечную точку состояния, которую клиент опрашивает для получения сведений о завершении операции.

Схема шаблона API HTTP

Устойчивые функции предоставляют встроенную поддержку для этого шаблона, упрощая или даже удаляя код, который вам нужно написать для взаимодействия с длительными выполнениями функций. Например, в примерах краткого руководства Устойчивые функции (C#, JavaScript, TypeScript, Python, PowerShell и Java) показана простая команда REST, которую можно использовать для запуска новых экземпляров функций оркестратора. После запуска экземпляра расширение предоставляет API HTTP веб-перехватчика, которые запрашивают состояние функции оркестратора.

В следующем примере показаны команды REST, которые запускают оркестратор и запрашивают его состояние. Для ясности в примере опущены некоторые данные протокола.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Так как управление состоянием осуществляется средой выполнения Устойчивых функций, вам не нужно реализовывать собственный механизм отслеживания состояния.

Расширение "Устойчивые функции" предоставляет встроенные API HTTP, которые управляют длительными оркестрациями. Кроме того, этот шаблон можно реализовать самостоятельно с помощью собственных триггеров функций (например, HTTP, очереди или Центры событий Azure) и устойчивой привязки клиента. Например, для активации действия прекращения можно использовать сообщение очереди. Кроме того, можно использовать триггер HTTP, защищенный политикой проверки подлинности Microsoft Entra, вместо встроенных API HTTP, использующих созданный ключ для проверки подлинности.

Дополнительные сведения см. в статье о функциях HTTP, в которой объясняется, как можно предоставлять асинхронные длительные процессы по протоколу HTTP с помощью расширения "Устойчивые функции".

Шаблон #4. Мониторинг

Шаблон мониторинга представляет собой гибкий, повторяющийся процесс в рабочем процессе. Например, повторение опроса, пока не будут выполнены определенные условия. Вы можете использовать регулярный триггер таймера для активации основного сценария, например задание периодической очистки. Но интервал является статическим, что усложняет управление временем существования экземпляра. Вы можете использовать Устойчивые функции, чтобы создать гибкие интервалы повторения, управлять временем существования задачи и создавать несколько процессов мониторинга в рамках одной оркестрации.

Примером шаблона мониторинга является измененный сценарий асинхронных API-интерфейсов HTTP, приведенный ранее. Вместо предоставления конечной точки внешнему клиенту для мониторинга длительной операции, мониторинг использует внешнюю конечную точку, а затем ожидает изменение состояния.

Схема шаблона мониторинга

Благодаря нескольким строкам кода можно использовать Устойчивые функции, чтобы создать несколько мониторингов, которые наблюдают за произвольными конечными точками. Работа мониторингов может приостановиться, если выполнено условие, или другая функция может использовать клиент оркестрации для завершения мониторинга. Можно изменить интервал мониторинга wait в зависимости от определенного условия (например, экспоненциальной задержки).

В следующем коде реализуется простой мониторинг:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

При получении запроса создается экземпляр оркестрации для указанного идентификатора события. Экземпляр опрашивает состояние до тех пор, пока не будет выполнено условие или до истечения времени ожидания. Для управления интервалом опроса используется устойчивый таймер. После этого можно перейти к другим задачам или завершить оркестрацию.

Шаблон 5. Участие пользователя

Многие автоматизированные процессы требуют некоторого участия пользователя. Трудность вовлечения людей в автоматизированный процесс заключается в том, что пользователи не так доступны и не так быстро реагируют, как облачные службы. Автоматизированный процесс должен учитывать это. Для этого используется время ожидания и логика компенсации.

Процесс утверждения является примером бизнес-процесса, который предполагает взаимодействие с пользователями. Утверждение от менеджера может потребоваться для отчета о расходах, превышающих определенную денежную сумму. Если менеджер не утверждает отчет о расходах в течение 72 часов (возможно, менеджер пошел на отпуск), процесс эскалации ударяется, чтобы получить утверждение от кого-то другого (возможно, менеджер менеджера).

Схема шаблона участия пользователя

Этот шаблон в примере можно реализовать с помощью функции оркестратора. Оркестратор использует устойчивый таймер, чтобы запросить одобрение. Оркестратор начинает эскалацию, если истекло время ожидания. Он ожидает внешнее событие в виде уведомления, созданного в результате участия пользователя.

В этих примерах создается процесс утверждения для демонстрации шаблона участия пользователя:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Для создания устойчивого таймера вызовите context.CreateTimer. Уведомление получает context.WaitForExternalEvent. Затем Task.WhenAny вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Для создания устойчивого таймера вызовите context.df.createTimer. Уведомление получает context.df.waitForExternalEvent. Затем context.df.Task.any вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Для создания устойчивого таймера вызовите context.create_timer. Уведомление получает context.wait_for_external_event. Затем context.task_any вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Для создания устойчивого таймера вызовите Start-DurableTimer. Уведомление получает Start-DurableExternalEventListener. Затем Wait-DurableTask вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Вызов метода ctx.waitForExternalEvent(...).await() приостанавливает оркестрацию, пока он не получит событие с именем ApprovalEvent, содержащее полезные данные boolean. При получении события вызывается функция действия для обработки результата утверждения. Однако если такое событие не получено до истечения срока действия timeout (72 часа), вызываются TaskCanceledException и функция действия Escalate.

Примечание.

Плата за время, затраченное на ожидание внешних событий при выполнении в плане потребления, не взимается.

Внешний клиент может доставить уведомление о событии в функцию оркестратора, находящуюся в состоянии ожидания, через встроенные интерфейсы API HTTP:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Событие также можно вызвать с помощью клиента оркестрации устойчивых функций из другой функции в том же приложении-функции:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Шаблон #6. Агрегатор (сущности с отслеживанием состояния)

Шестой шаблон заключается в статистической обработке данных событий за определенный период времени в одну доступную для адресации сущность. В этом шаблоне агрегированные данные могут поступать из нескольких источников, могут доставляться в пакетах или могут быть разбросаны в течение длительного периода времени. Агрегатор может потребоваться принять меры по обработке данных событий по мере поступления, а внешним клиентам может потребоваться запрашивать агрегированные данные.

Схема агрегатора

Сложности при реализации этого шаблона с обычными функциями без отслеживания состояния заключаются в том, что управление параллелизмом крайне усложняется. Нужно не только беспокоиться о том, что несколько потоков одновременно изменяют одни и те же данные, также важно и то, чтобы агрегатор одновременно выполнялся только на одной виртуальной машине.

Устойчивые сущности можно использовать, чтобы легко реализовать этот шаблон как отдельную функцию.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Устойчивые сущности можно также моделировать как классы в .NET. Эта модель может быть полезной, если список операций является фиксированным и становится большим. Следующий пример представляет собой эквивалентную реализацию сущности Counter с помощью классов и методов .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Примечание.

Устойчивые сущности в настоящее время не поддерживаются в PowerShell.

Примечание.

В настоящее время в Java не поддерживаются устойчивые сущности.

Клиенты могут поставить в очередь операции (называемые также "сигнализацией") для функции сущности, использующей привязку клиента сущности.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Примечание.

Динамически создаваемые прокси-серверы также доступны в .NET для сигнализации сущностям типобезопасным способом. Кроме сигнализации, клиенты также могут запрашивать состояние функции сущности с помощью типобезопасных методов в привязке клиента оркестрации.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Функции сущности доступны в расширении Устойчивые функции 2.0 и более поздних версий для C#, JavaScript и Python.

Технология

Расширение "Устойчивые функции" создано на платформе устойчивых задач, библиотеки с открытым кодом на GitHub для создания рабочих процессов в коде. Подобно тому, как Функции Azure являются бессерверным развитием веб-заданий Azure, Устойчивые функции — это бессерверное развитие платформы устойчивых задач. Платформа устойчивых задач широко используется как корпорацией Майкрософт, так и другими организациями для автоматизации критически важных процессов. Это естественный выбор для бессерверной среды функций Azure.

Ограничения кода

Чтобы обеспечить надежность и гарантировать длительное выполнение, в функциях оркестратора предусмотрен набор правил кода, которым нужно следовать. Дополнительные сведения см. в статье Orchestrator function code constraints (Ограничения кода функции оркестратора).

Выставление счетов

Счета за Устойчивые функции выставляются так же, как и за Функции Azure. Дополнительные сведения см. на странице цен на Функции Azure. При выполнении функций оркестратора в рамках плана потребления Функций Azure необходимо учитывать некоторые особенности при выставлении счетов. Дополнительные сведения об этом см. в статье Durable Functions Billing (Выставление счетов за Устойчивые функции).

Приступайте к работе

Вы можете начать работу с Устойчивыми функциями менее чем за 10 минут, просмотрев одно из следующих кратких руководств по конкретным языкам.

В этих кратких руководствах показано, как локально создать и протестировать устойчивую функцию "Hello world". Затем вы опубликуете код функции в Azure. Функция, которую вы создаете, организовывает и объединяет в цепочку вызовы других функций.

Публикации

Расширение "Устойчивые функции" разработано совместно с Microsoft Research. Команда, работающая над Устойчивыми функциями, активно создает исследовательские статьи и артефакты, включая следующее:

Подробнее

В следующем видео показаны преимущества Устойчивых функций.

Так как Устойчивые функции — это дополнительное расширение решения Функции Azure, оно подходит не для всех приложений. Сравнение с другими технологиями оркестрации Azure см. в разделе Сравнение служб "Функции Azure" и Azure Logic Apps.

Следующие шаги

Durable Functions types and features (Azure Functions) (Типы и возможности Устойчивых функций (Функции Azure))