Поделиться через


Цепочка функций

Цепочка функций — это паттерн, при котором выполняется последовательность функций в заданном порядке. Обычно выходные данные одной функции передаются в входные данные следующей функции. В этой статье описывается последовательность вызовов, которая создается при выполнении руководства по началу работы с Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell или Java). Дополнительные сведения см. в обзоре Durable Functions.

Предварительные условия

Цепочка функций — это шаблон, в котором выполняется последовательность действий в порядке. Обычно выходные данные одного действия передаются в входные данные следующего. В этой статье описывается последовательность вызовов для SDK Durable Task для .NET, JavaScript, Python и Java.

Функции

В этой статье описаны эти функции в примере приложения:

  • E1_HelloSequence: функция оркестратора , которая вызывает E1_SayHello несколько раз в последовательности. Он сохраняет все выходные данные и записывает результаты.
  • E1_SayHello: функция действия , которая добавляет "Hello" в начало строки.
  • HttpStart: функция устойчивого клиента , активироваемая HTTP, которая запускает экземпляр оркестратора.

В этой статье описываются эти компоненты в примере приложения:

  • GreetingOrchestration, greetingOrchestrator, function_chaining_orchestrator или ActivityChaining: оркестратор, который вызывает несколько действия по порядку. Он сохраняет все выходные данные и записывает результаты.
  • Функции действий: действия, которые обрабатывают входные данные и возвращают результаты. Каждое действие выполняет простое преобразование входных данных.
  • Клиент: клиентское приложение, которое запускает экземпляр оркестратора и ожидает результата.

Orchestrator

[FunctionName("E1_HelloSequence")]
public static async Task<List<string>> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var outputs = new List<string>();

    outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Tokyo"));
    outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Seattle"));
    outputs.Add(await context.CallActivityAsync<string>("E1_SayHello_DirectInput", "London"));

    // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
    return outputs;
}

Все функции оркестрации C# должны иметь параметр типа DurableOrchestrationContext, который существует в сборке Microsoft.Azure.WebJobs.Extensions.DurableTask. Этот объект контекста позволяет вызывать другие функции действия и передавать им входные параметры с помощью метода CallActivityAsync.

Этот код трижды последовательно вызывает E1_SayHello с разными значениями параметров. Значение, возвращаемое при каждом вызове, добавляется в список outputs, который возвращается в конце функции.

В этом коде показан оркестратор, который вызывает три действия в последовательности и передает каждый вывод в следующее действие:

using System.Threading.Tasks;
using Microsoft.DurableTask;

[DurableTask]
public class GreetingOrchestration : TaskOrchestrator<string, string>
{
    public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
    {
        // Step 1: Say hello to the person
        string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

        // Step 2: Process the greeting
        string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

        // Step 3: Finalize the response
        string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

        return finalResponse;
    }
}

Все оркестраторы .NET наследуются от TaskOrchestrator<TInput, TOutput>. Элемент TaskOrchestrationContext позволяет вызывать действия с помощью CallActivityAsync. Код вызывает три действия в последовательности, где каждое действие получает выходные данные предыдущего действия.

Activity

[FunctionName("E1_SayHello")]
public static string SayHello([ActivityTrigger] IDurableActivityContext context)
{
    string name = context.GetInput<string>();
    return $"Hello {name}!";
}

Действия используют атрибут ActivityTrigger. Используйте IDurableActivityContext для выполнения действий, таких как чтение входных данных с помощью GetInput<T>.

E1_SayHello форматирует строку приветствия.

Вместо привязки к IDurableActivityContext, привязывайтесь напрямую к типу, передаваемому в функцию. Например:

[FunctionName("E1_SayHello_DirectInput")]
public static string SayHelloDirectInput([ActivityTrigger] string name)
{
    return $"Hello {name}!";
}

Действия в пакете SDK для устойчивых задач наследуются от TaskActivity<TInput, TOutput>:

using System.Threading.Tasks;
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    private readonly ILogger<SayHelloActivity> _logger;

    public SayHelloActivity(ILogger<SayHelloActivity> logger)
    {
        _logger = logger;
    }

    public override Task<string> RunAsync(TaskActivityContext context, string name)
    {
        _logger.LogInformation("Activity SayHello called with name: {Name}", name);
        return Task.FromResult($"Hello {name}!");
    }
}

[DurableTask]
public class ProcessGreetingActivity : TaskActivity<string, string>
{
    public override Task<string> RunAsync(TaskActivityContext context, string greeting)
    {
        return Task.FromResult($"{greeting} How are you today?");
    }
}

[DurableTask]
public class FinalizeResponseActivity : TaskActivity<string, string>
{
    public override Task<string> RunAsync(TaskActivityContext context, string response)
    {
        return Task.FromResult($"{response} I hope you're doing well!");
    }
}

Используйте внедрение зависимостей для получения таких сервисов, как ILogger. [DurableTask] Добавьте атрибут для регистрации действия в рабочей роли.

Клиент

Запустите экземпляр функции оркестратора из клиентской функции. Используйте функцию HttpStart, запускаемую по HTTP-запросу, для создания экземпляров E1_HelloSequence.

public static class HttpStart
{
    [FunctionName("HttpStart")]
    public static async Task<HttpResponseMessage> Run(
        [HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}")] HttpRequestMessage req,
        [DurableClient] IDurableClient starter,
        string functionName,
        ILogger log)
    {
        // Function input comes from the request content.
        object eventData = await req.Content.ReadAsAsync<object>();
        string instanceId = await starter.StartNewAsync(functionName, eventData);

        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

        return starter.CreateCheckStatusResponse(req, instanceId);
    }
}

Чтобы взаимодействовать с оркестраторами, добавьте входную привязку DurableClient . Используйте клиент для запуска оркестрации и возврата HTTP-ответа, включающего URL-адреса для проверки состояния новой оркестрации.

Запустите оркестрацию из клиентского приложения. Клиент планирует оркестрацию и может ждать завершения.

using System;
using Microsoft.DurableTask.Client;

// Create the client
var client = DurableTaskClientBuilder.UseDurableTaskScheduler(connectionString).Build();

// Schedule a new orchestration instance
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(GreetingOrchestration),
    input: "World");

Console.WriteLine($"Started orchestration with ID: {instanceId}");

// Wait for the orchestration to complete
OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true);

Console.WriteLine($"Orchestration completed with result: {result.ReadOutputAs<string>()}");

Создайте DurableTaskClient с помощью строки подключения к планировщику устойчивых задач. Используйте ScheduleNewOrchestrationInstanceAsync для запуска оркестрации и WaitForInstanceCompletionAsync для ожидания завершения.

Запустите пример

Чтобы запустить E1_HelloSequence оркестрацию, отправьте этот POST-запрос HTTP в функцию HttpStart.

POST http://{host}/orchestrators/E1_HelloSequence

Примечание.

Предыдущий фрагмент кода HTTP предполагает, что в файле host.json образца удален стандартный префикс api/ из всех URL-адресов функций HTTP-триггеров. Найдите эту конфигурацию в файлеhost.json примера.

Например, если вы используете пример в приложении-функции с именем myfunctionapp, замените {host} на myfunctionapp.azurewebsites.net.

Запрос возвращает HTTP 202 (обрезанный для краткости):

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/96924899c16d43b08a536de376ac786b?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Очередь оркестрации формируется и немедленно запускается. Используйте URL-адрес в заголовке Location , чтобы проверить состояние выполнения.

GET http://{host}/runtime/webhooks/durabletask/instances/96924899c16d43b08a536de376ac786b?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

Ответ показывает состояние оркестрации. Так как это происходит быстро, экземпляр часто находится в состоянии «Завершено» и возвращает ответ в сокращённом виде.

HTTP/1.1 200 OK
Content-Length: 179
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":null,"output":["Hello Tokyo!","Hello Seattle!","Hello London!"],"createdTime":"2017-06-29T05:24:57Z","lastUpdatedTime":"2017-06-29T05:24:59Z"}

Экземпляр runtimeStatusзавершен и output содержит сериализованный в формате JSON результат выполнения функции оркестратора.

Примечание.

Реализуйте аналогичную логику запуска для других типов триггеров, например queueTrigger, eventHubTriggerили timerTrigger.

Просмотрите журналы выполнения функции. Функция E1_HelloSequence запускается и завершается несколько раз из-за поведения воспроизведения, описанного в устойчивости работы оркестрации. Но E1_SayHello запускается только три раза, так как вызовы функции действия не повторяются.

Чтобы запустить пример, вам потребуется:

  1. Запустите эмулятор планировщика устойчивых задач (для локальной разработки):

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Запустите рабочий процесс , чтобы зарегистрировать оркестратор и операции.

  3. Запустите клиент , чтобы запланировать оркестрацию и дождаться результата.

Вывод клиента демонстрирует результат последовательной оркестрации:

Started orchestration with ID: abc123
Orchestration completed with result: "Hello World! How are you today? I hope you're doing well!"

Журналы рабочих ролей показывают, что каждое действие выполняется в последовательности и передает выходные данные в следующее действие.

Следующие шаги

В этом примере демонстрируется простая оркестрация цепочки функций. Далее реализуйте шаблон масштабирования fan-out/fan-in.

В этом примере демонстрируется простая оркестрация цепочки функций. Далее изучите дополнительные шаблоны.

Полные примеры пакета SDK javaScript см. в примерах пакета SDK для JavaScript для устойчивых задач.