Поделиться через


Краткое руководство: Размещение приложения SDK для устойчивых задач в приложениях контейнеров Azure (предварительный просмотр)

Это важно

В настоящее время пакеты SDK для устойчивых задач недоступны для JavaScript и PowerShell.

Это важно

В настоящее время пакеты SDK для устойчивых задач недоступны для JavaScript и PowerShell.

Из этого краткого руководства вы узнаете, как:

  • Настройте и запустите эмулятор планировщика устойчивых задач для локальной разработки.
  • Запустите рабочие и клиентские проекты.
  • Проверьте журналы приложений контейнеров Azure.
  • Просмотрите статус и историю оркестрации с помощью панели управления планировщика Durable Task.

Предпосылки

Подготовка к работе:

Подготовка проекта

В новом терминале, из каталога Azure-Samples/Durable-Task-Scheduler, перейдите в каталог примеров.

cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining

Развертывание с помощью Интерфейса командной строки разработчика Azure

  1. Запустите azd up , чтобы подготовить инфраструктуру и развернуть приложение в Приложениях контейнеров Azure в одной команде.

    azd up
    
  2. При появлении запроса в терминале укажите следующие параметры.

    Параметр Описание
    Название окружения Префикс группы ресурсов, созданной для хранения всех ресурсов Azure.
    Расположение Azure Расположение Azure для ваших ресурсов.
    Подписка на Azure Подписка Azure для ваших ресурсов.

    Для завершения этого процесса может потребоваться некоторое время. По завершении выполнения команды вывод CLI отображает две ссылки портала Azure для отслеживания хода развертывания. Результаты также демонстрируют, как azd up:

    • Создает и настраивает все необходимые ресурсы Azure с помощью предоставленных Bicep-файлов в каталоге ./infra с использованием azd provision. После того как ресурсы будут подготовлены с помощью Интерфейса командной строки разработчика Azure, вы сможете получить к ним доступ через портал Azure. К файлам, которые подготавливают ресурсы Azure, относятся:
      • main.parameters.json
      • main.bicep
      • app Каталог ресурсов, упорядоченный по функциям
      • Эталонная core библиотека, содержащая модули Bicep, используемые шаблоном azd
    • Развертывание кода с помощью azd deploy

    Ожидаемые выходные данные

    Packaging services (azd package)
    
    (✓) Done: Packaging service client
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    (✓) Done: Packaging service worker
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    Provisioning Azure resources (azd provision)
    Provisioning Azure resources can take some time.
    
    Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID)
    Location: West US 2
    
     You can view detailed progress in the Azure Portal:
     https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT
    
     (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s)
     (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s)
     (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s)
     (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s)
     (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s)   
    
    Deploying services (azd deploy)
    
     (✓) Done: Deploying service client
     - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/
    
     (✓) Done: Deploying service worker
     - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/
    
    
    SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.   
    

Подтверждение успешного развертывания

На портале Azure убедитесь, что оркестрации выполняются успешно.

  1. Скопируйте имя группы ресурсов из выходных данных терминала.

  2. Войдите на портал Azure и найдите имя этой группы ресурсов.

  3. На странице обзора группы ресурсов щелкните ресурс контейнерного приложения клиента.

  4. Выберите мониторинг>поток журнала.

  1. Убедитесь, что контейнер клиента регистрирует задачи по цепочке функций.

    Снимок экрана: поток журнала контейнера клиента на портале Azure.

  2. Вернитесь на страницу группы ресурсов, чтобы выбрать worker контейнер.

  3. Выберите мониторинг>поток журнала.

  4. Убедитесь, что рабочий контейнер записывает логи задач цепочки функций.

    Снимок экрана: поток журнала рабочего контейнера на портале Azure.

  1. Убедитесь, что контейнерное приложение регистрирует задачи последовательного выполнения функций.

    Скриншот потока журнала примера приложения Java на портале Azure.

Общие сведения о коде

Клиентский проект

Проект клиента:

  • Использует ту же логику строки подключения, что и рабочий
  • Реализует последовательный планировщик оркестрации, который:
    • Планируется 20 экземпляров оркестрации, по одному за раз
    • Ожидает 5 секунд между планированием каждой оркестрации
    • Отслеживает все экземпляры оркестрации в списке
    • Ожидает завершения всех оркестраций перед выходом
  • Использует стандартное ведение журнала для отображения хода выполнения и результатов
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";

    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);

    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}

// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}

Рабочий проект

Проект Worker содержит:

  • GreetingOrchestration.cs. Определяет функции оркестратора и действия в одном файле
  • Program.cs: Настройка рабочего сервера с правильной обработкой строки подключения

Реализация оркестрации

Оркестрация напрямую вызывает каждое действие в последовательности с помощью стандартного CallActivityAsync метода:

public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

    return finalResponse;
}

Каждое действие реализуется как отдельный класс, украшенный атрибутом [DurableTask] :

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}

Рабочий использует Microsoft.Extensions.Hosting для правильного управления жизненным циклом.

var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();

Клиент

Проект клиента:

  • Использует ту же логику строки подключения, что и рабочий
  • Реализует последовательный планировщик оркестрации, который:
    • Планируется 20 экземпляров оркестрации, по одному за раз
    • Ожидает 5 секунд между планированием каждой оркестрации
    • Отслеживает все экземпляры оркестрации в списке
    • Ожидает завершения всех оркестраций перед выходом
  • Использует стандартное ведение журнала для отображения хода выполнения и результатов
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")

        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )

        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")

        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )

Рабочий

Реализация оркестрации

Оркестрация напрямую вызывает каждое действие в последовательности с помощью стандартной call_activity функции:

# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")

    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)

    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)

    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)

    return final_response

Каждое действие реализуется как отдельная функция:

# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"

def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"

def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"

Рабочий использует DurableTaskSchedulerWorker для правильного управления жизненным циклом.

with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:

    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)

    # Start the worker (without awaiting)
    worker.start()

Пример приложения контейнера содержит рабочий и клиентский код.

Клиент

Код клиента:

  • Использует ту же логику строки подключения, что и рабочий
  • Реализует последовательный планировщик оркестрации, который:
    • Планируется 20 экземпляров оркестрации, по одному за раз
    • Ожидает 5 секунд между планированием каждой оркестрации
    • Отслеживает все экземпляры оркестрации в списке
    • Ожидает завершения всех оркестраций перед выходом
  • Использует стандартное ведение журнала для отображения хода выполнения и результатов
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();

// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))

Рабочий

Оркестрация напрямую вызывает каждое действие в последовательности с помощью стандартного callActivity метода:

DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }

        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();

// Start the worker
worker.start();

Дальнейшие шаги