Поделиться через


Обработка внешних событий

Функции Оркестратора могут ожидать и прослушивать внешние события. Эта функция Durable Functions часто полезна для обработки взаимодействия с человеком или других внешних триггеров.

Замечание

Внешние события — это односторонняя асинхронная операция. Они не подходят для ситуаций, когда клиент, отправляющий событие, нуждается в синхронном ответе от функции оркестратора.

Оркестрации могут ждать и прослушивать внешние события. Эта функция часто полезна для обработки взаимодействия с человеком или других внешних триггеров.

Замечание

Внешние события — это односторонняя асинхронная операция. Они не подходят для случаев, когда клиент, отправляющий событие, нуждается в синхронном отклике от оркестрации.

Это важно

В настоящее время пакет SDK для устойчивых задач PowerShell недоступен.

Ожидание событий

API "wait-for-external-event" для привязки триггера оркестрации позволяет функции оркестратора асинхронно ожидать и отслеживать событие, доставленное внешним клиентом. Функция оркестратора прослушивания объявляет имя события и форму получаемых данных .

API wait-for-external-event позволяет оркестрации асинхронно ожидать и прослушивать событие, доставленное внешним клиентом. Прослушивающая оркестрация объявляет имя события и структуру данных, которые она ожидает получить.

Изолированная рабочая модель

using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;

public class BudgetApproval
{
    private readonly ILogger _logger;

    public BudgetApproval(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger<BudgetApproval>();
    }

    [Function("BudgetApproval")]
    public async Task Run(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        bool approved = await context.WaitForExternalEventAsync<bool>("Approval");
        if (approved)
        {
            // approval granted - do the approved action
        }
        else
        {
            // approval denied - send a notification
        }
    }
}

Модель внутрипроцессного процесса

[FunctionName("BudgetApproval")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    bool approved = await context.WaitForExternalEvent<bool>("Approval");
    if (approved)
    {
        // approval granted - do the approved action
    }
    else
    {
        // approval denied - send a notification
    }
}

Замечание

Если вы используете Durable Functions 1.x, используйте DurableOrchestrationContext вместо IDurableOrchestrationContext. Дополнительные сведения см. в статье о версиях Durable Functions.

public class BudgetApproval : TaskOrchestrator<object?, bool>
{
    public override async Task<bool> RunAsync(TaskOrchestrationContext context, object? input)
    {
        bool approved = await context.WaitForExternalEvent<bool>("Approval");
        if (approved)
        {
            // approval granted - do the approved action
        }
        else
        {
            // approval denied - send a notification
        }
        return approved;
    }
}

Предыдущий пример прослушивает одно конкретное событие и выполняет действия при его получении.

Вы можете прослушивать несколько событий одновременно, например в следующем примере, которое ожидает одного из трех возможных уведомлений о событиях.

Изолированная рабочая модель

[Function("Select")]
public async Task Run(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    Task<float> event1 = context.WaitForExternalEventAsync<float>("Event1");
    Task<bool> event2 = context.WaitForExternalEventAsync<bool>("Event2");
    Task<int> event3 = context.WaitForExternalEventAsync<int>("Event3");

    Task winner = await Task.WhenAny(event1, event2, event3);
    if (winner == event1)
    {
        // ...
    }
    else if (winner == event2)
    {
        // ...
    }
    else if (winner == event3)
    {
        // ...
    }
}

Модель внутрипроцессного процесса

[FunctionName("Select")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var event1 = context.WaitForExternalEvent<float>("Event1");
    var event2 = context.WaitForExternalEvent<bool>("Event2");
    var event3 = context.WaitForExternalEvent<int>("Event3");

    var winner = await Task.WhenAny(event1, event2, event3);
    if (winner == event1)
    {
        // ...
    }
    else if (winner == event2)
    {
        // ...
    }
    else if (winner == event3)
    {
        // ...
    }
}

Замечание

Использование Durable Functions 1.x? Замените DurableOrchestrationContext на IDurableOrchestrationContext. Дополнительные сведения о других различиях версий см. в статье Durable Functions.

public class SelectOrchestrator : TaskOrchestrator<object?, object?>
{
    public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
    {
        Task<float> event1 = context.WaitForExternalEvent<float>("Event1");
        Task<bool> event2 = context.WaitForExternalEvent<bool>("Event2");
        Task<int> event3 = context.WaitForExternalEvent<int>("Event3");

        Task winner = await Task.WhenAny(event1, event2, event3);
        if (winner == event1)
        {
            // ...
        }
        else if (winner == event2)
        {
            // ...
        }
        else if (winner == event3)
        {
            // ...
        }
        return null;
    }
}

В предыдущем примере прослушивается любое из нескольких событий. Вы также можете ждать всех событий.

Изолированная рабочая модель

[Function("NewBuildingPermit")]
public async Task Run(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    string applicationId = context.GetInput<string>();

    Task gate1 = context.WaitForExternalEventAsync<object>("CityPlanningApproval");
    Task gate2 = context.WaitForExternalEventAsync<object>("FireDeptApproval");
    Task gate3 = context.WaitForExternalEventAsync<object>("BuildingDeptApproval");

    // all three departments must grant approval before a permit can be issued
    await Task.WhenAll(gate1, gate2, gate3);

    await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}

Модель внутрипроцессного процесса

[FunctionName("NewBuildingPermit")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    string applicationId = context.GetInput<string>();

    var gate1 = context.WaitForExternalEvent("CityPlanningApproval");
    var gate2 = context.WaitForExternalEvent("FireDeptApproval");
    var gate3 = context.WaitForExternalEvent("BuildingDeptApproval");

    // all three departments must grant approval before a permit can be issued
    await Task.WhenAll(gate1, gate2, gate3);

    await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}

Замечание

Если вы используете Durable Functions 1.x, используйте DurableOrchestrationContext вместо IDurableOrchestrationContext. Перейдите к Durable Functions версии для полного описания различий версий.

В .NET, если полезные данные события не могут быть преобразованы в ожидаемый тип T, выбрасывается исключение.

public class NewBuildingPermit : TaskOrchestrator<string, object?>
{
    public override async Task<object?> RunAsync(TaskOrchestrationContext context, string applicationId)
    {
        Task<object?> gate1 = context.WaitForExternalEvent<object?>("CityPlanningApproval");
        Task<object?> gate2 = context.WaitForExternalEvent<object?>("FireDeptApproval");
        Task<object?> gate3 = context.WaitForExternalEvent<object?>("BuildingDeptApproval");

        // all three departments must grant approval before a permit can be issued
        await Task.WhenAll(gate1, gate2, gate3);

        await context.CallActivityAsync("IssueBuildingPermit", applicationId);
        return null;
    }
}

В .NET, если полезные данные события не могут быть преобразованы в ожидаемый тип T, выбрасывается исключение.

API "wait-for-external-event" ожидает неограниченное время для некоторых входных данных. Вы можете безопасно выгрузить функциональное приложение во время ожидания. Если и когда событие поступает для этого экземпляра оркестрации, экземпляр просыпается автоматически и немедленно обрабатывает событие.

Замечание

Если приложение-функция использует тарифный план 'По потреблению', плата не взимается, пока функция оркестратора ожидает внешнего события, независимо от времени ожидания.

Как и в случае с функциями действий, внешние события имеют по крайней мере однократную гарантию доставки. Это означает, что при определенных условиях (например, перезапуски, масштабирование, сбои и т. д.) приложение может получать дубликаты одного и того же внешнего события. Поэтому мы рекомендуем, чтобы внешние события содержали идентификатор, который позволяет вручную устранять дубликаты в системах оркестрации.

API "wait-for-external-event" ожидает неограниченное время для некоторых входных данных. Вы можете безопасно остановить процесс во время ожидания. Если и когда событие поступает для этого экземпляра оркестрации, оно просыпается автоматически и немедленно обрабатывает событие.

Внешние события имеют гарантию доставки хотя бы один раз. Это означает, что при определенных условиях (например, перезапуски, масштабирование, сбои и т. д.) приложение может получать дубликаты одного и того же внешнего события. Поэтому мы рекомендуем, чтобы внешние события содержали какой-то идентификатор, который позволяет вручную удалять дубликаты в оркестрациях.

Отправка событий

API raise-event, определенный привязкой клиента оркестровки, можно использовать для отправки внешнего события в оркестровку. Для отправки внешнего события в оркестрацию можно также использовать встроенный HTTP API для генерации события.

Инициированное событие включает instanceID, eventName, и eventData в качестве параметров. Функции оркестратора обрабатывают эти события с помощью wait-for-external-event API. Для того чтобы событие было обработано, eventName должен совпадать как на отправляющей, так и на принимающей стороне. Данные события также должны быть сериализуемыми в формате JSON.

Внутри системы механизмы "raise-event" поставляют в очередь сообщение, которое извлекается ожидающей функцией оркестратора. Если экземпляр не ожидает указанного имени события, сообщение о событии добавляется в очередь в памяти. Если экземпляр оркестрации позже начинает прослушивать это имя события, он проверяет очередь сообщений о событиях.

Замечание

Если экземпляр оркестрации с указанным идентификатором экземпляра отсутствует, сообщение о событии удаляется.

Ниже приведен пример функции с активацией очереди, которая отправляет событие "Утверждение" в экземпляр функции оркестратора. Идентификатор экземпляра оркестрации поступает из текста сообщения очереди.

API "raise-event" можно использовать в клиенте Durable Task для отправки внешнего события в оркестрацию.

Вызванное событие включает идентификатор экземпляра, имя события и данные события в качестве параметров. Оркестрации обрабатывают эти события с помощью API wait-for-external-event . Имя события должно совпадать как с отправкой, так и с получением, чтобы событие было обработано. Данные события также должны быть сериализуемыми в формате JSON.

Внутри системы механизмы "raise-event" помещают сообщение в очередь, которое затем обрабатывается ожидающей оркестрацией. Если экземпляр не ожидает указанного названия события, сообщение о событии добавляется в очередь в памяти. Если экземпляр оркестрации позже начинает прослушивать это имя события, он проверит очередь сообщений о событиях.

Замечание

Если в указанном instanceIDэкземпляре оркестрации нет, сообщение о событии удаляется.

Ниже приведен пример, который отправляет событие "Утверждение" в экземпляр оркестрации.

Изолированная рабочая модель

using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask.Client;

public class ApprovalQueueProcessor
{
    [Function("ApprovalQueueProcessor")]
    public async Task Run(
        [QueueTrigger("approval-queue")] string instanceId,
        [DurableClient] DurableTaskClient client)
    {
        await client.RaiseEventAsync(instanceId, "Approval", true);
    }
}

Модель внутрипроцессного процесса

[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
    [QueueTrigger("approval-queue")] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    await client.RaiseEventAsync(instanceId, "Approval", true);
}

Замечание

Для Durable Functions 1.x используйте вместо него атрибут OrchestrationClient и тип параметра DurableOrchestrationClient. Ознакомьтесь со статьёй о версиях Durable Functions для всех изменений, специфичных для каждой версии.

Во внутреннем интерфейсе API "raise-event" вложено сообщение, которое извлекается функцией оркестратора ожидания. Если экземпляр не ожидает указанного имени события, сообщение о событии добавляется в буфер в памяти. Если экземпляр оркестрации позже начинает прослушивать это имя события, он проверяет буфер на наличие сообщений о событиях и активирует задачу, ожидающую его.

Замечание

Если экземпляр оркестрации с указанным идентификатором экземпляра отсутствует, сообщение о событии удаляется.

await client.RaiseEventAsync(instanceId, "Approval", true);

Во внутреннем режиме API "raise-event" помещает в очередь сообщение, которое извлекается ожидающей оркестрацией. Если экземпляр не ожидает указанного имени события, сообщение события добавляется в буфер в памяти. Если экземпляр оркестрации позже начинает прослушивать данное имя события, он проверит буфер на сообщения о событиях и активирует задачу, ожидающую его.

Замечание

Если экземпляр оркестрации с указанным идентификатором экземпляра отсутствует, сообщение о событии удаляется.

HTTP

Ниже приведен пример HTTP-запроса, который вызывает Approval событие для экземпляра оркестрации.

POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json

"true"

В этом случае идентификатор экземпляра жестко закодирован как MyInstanceId.

Рекомендации по внешним событиям

При работе с внешними событиями следует учитывать следующие рекомендации.

Использование уникальных имен событий для дедупликации

Внешние события имеют гарантию доставки хотя бы один раз. В некоторых редких условиях (которые могут возникать во время перезапуска, масштабирования или сбоя), приложение может получать дубликаты одного и того же внешнего события. Рекомендуется, чтобы внешние события содержали уникальный идентификатор, который позволяет выполнять дедупликацию вручную в оркестраторах.

Замечание

Поставщик хранилища MSSQL использует внешние события и транзакционно обновляет состояние оркестратора, поэтому, в отличие от поставщика Azure Storage, отсутствует риск дублированных событий. Однако по-прежнему рекомендуется, чтобы внешние события имели уникальные имена, чтобы код был переносимым между бэкендами.

Дальнейшие действия