Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Это важно
В настоящее время пакеты SDK для устойчивых задач недоступны для JavaScript и PowerShell.
Это важно
В настоящее время пакеты SDK для устойчивых задач недоступны для JavaScript и PowerShell.
Из этого краткого руководства вы узнаете, как:
- Настройте и запустите эмулятор планировщика устойчивых задач для локальной разработки.
- Запустите рабочие и клиентские проекты.
- Проверьте журналы приложений контейнеров Azure.
- Просмотрите статус и историю оркестрации с помощью панели управления планировщика Durable Task.
Предпосылки
Подготовка к работе:
- Убедитесь, что у вас есть пакет SDK для .NET 8 или более поздней версии.
- Установите Docker для запуска эмулятора.
- Установка Интерфейса командной строки разработчика Azure
- Клонируйте репозиторий планировщика устойчивых задач GitHub , чтобы использовать пример краткого руководства.
- Убедитесь, что у вас есть Python 3.9 или более поздней версии.
- Установите Docker для запуска эмулятора.
- Установка Интерфейса командной строки разработчика Azure
- Клонируйте репозиторий планировщика устойчивых задач GitHub , чтобы использовать пример краткого руководства.
- Убедитесь, что у вас есть Java 8 или 11.
- Установите Docker для запуска эмулятора.
- Установка Интерфейса командной строки разработчика Azure
- Клонируйте репозиторий планировщика устойчивых задач GitHub , чтобы использовать пример краткого руководства.
Подготовка проекта
В новом терминале, из каталога Azure-Samples/Durable-Task-Scheduler
, перейдите в каталог примеров.
cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
Развертывание с помощью Интерфейса командной строки разработчика Azure
Запустите
azd up
, чтобы подготовить инфраструктуру и развернуть приложение в Приложениях контейнеров Azure в одной команде.azd up
При появлении запроса в терминале укажите следующие параметры.
Параметр Описание Название окружения Префикс группы ресурсов, созданной для хранения всех ресурсов Azure. Расположение Azure Расположение Azure для ваших ресурсов. Подписка на Azure Подписка Azure для ваших ресурсов. Для завершения этого процесса может потребоваться некоторое время. По завершении выполнения команды вывод CLI отображает две ссылки портала Azure для отслеживания хода развертывания. Результаты также демонстрируют, как
azd up
:- Создает и настраивает все необходимые ресурсы Azure с помощью предоставленных Bicep-файлов в каталоге
./infra
с использованиемazd provision
. После того как ресурсы будут подготовлены с помощью Интерфейса командной строки разработчика Azure, вы сможете получить к ним доступ через портал Azure. К файлам, которые подготавливают ресурсы Azure, относятся:main.parameters.json
main.bicep
-
app
Каталог ресурсов, упорядоченный по функциям - Эталонная
core
библиотека, содержащая модули Bicep, используемые шаблономazd
- Развертывание кода с помощью
azd deploy
Ожидаемые выходные данные
Packaging services (azd package) (✓) Done: Packaging service client - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} (✓) Done: Packaging service worker - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} Provisioning Azure resources (azd provision) Provisioning Azure resources can take some time. Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID) Location: West US 2 You can view detailed progress in the Azure Portal: https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s) (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s) (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s) (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s) (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s) Deploying services (azd deploy) (✓) Done: Deploying service client - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/ (✓) Done: Deploying service worker - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/ SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.
- Создает и настраивает все необходимые ресурсы Azure с помощью предоставленных Bicep-файлов в каталоге
Подтверждение успешного развертывания
На портале Azure убедитесь, что оркестрации выполняются успешно.
Скопируйте имя группы ресурсов из выходных данных терминала.
Войдите на портал Azure и найдите имя этой группы ресурсов.
На странице обзора группы ресурсов щелкните ресурс контейнерного приложения клиента.
Выберите мониторинг>поток журнала.
Убедитесь, что контейнер клиента регистрирует задачи по цепочке функций.
Вернитесь на страницу группы ресурсов, чтобы выбрать
worker
контейнер.Выберите мониторинг>поток журнала.
Убедитесь, что рабочий контейнер записывает логи задач цепочки функций.
Убедитесь, что контейнерное приложение регистрирует задачи последовательного выполнения функций.
Общие сведения о коде
Клиентский проект
Проект клиента:
- Использует ту же логику строки подключения, что и рабочий
- Реализует последовательный планировщик оркестрации, который:
- Планируется 20 экземпляров оркестрации, по одному за раз
- Ожидает 5 секунд между планированием каждой оркестрации
- Отслеживает все экземпляры оркестрации в списке
- Ожидает завершения всех оркестраций перед выходом
- Использует стандартное ведение журнала для отображения хода выполнения и результатов
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
// Create a unique instance ID
string instanceName = $"{name}_{i+1}";
// Schedule the orchestration
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"GreetingOrchestration",
instanceName);
// Wait 5 seconds before scheduling the next one
await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}
// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: false, CancellationToken.None);
}
Рабочий проект
Проект Worker содержит:
- GreetingOrchestration.cs. Определяет функции оркестратора и действия в одном файле
- Program.cs: Настройка рабочего сервера с правильной обработкой строки подключения
Реализация оркестрации
Оркестрация напрямую вызывает каждое действие в последовательности с помощью стандартного CallActivityAsync
метода:
public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
// Step 1: Say hello to the person
string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);
// Step 2: Process the greeting
string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);
// Step 3: Finalize the response
string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);
return finalResponse;
}
Каждое действие реализуется как отдельный класс, украшенный атрибутом [DurableTask]
:
[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
// Implementation details
}
Рабочий использует Microsoft.Extensions.Hosting
для правильного управления жизненным циклом.
var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
.AddTasks(registry => {
registry.AddAllGeneratedTasks();
})
.UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();
Клиент
Проект клиента:
- Использует ту же логику строки подключения, что и рабочий
- Реализует последовательный планировщик оркестрации, который:
- Планируется 20 экземпляров оркестрации, по одному за раз
- Ожидает 5 секунд между планированием каждой оркестрации
- Отслеживает все экземпляры оркестрации в списке
- Ожидает завершения всех оркестраций перед выходом
- Использует стандартное ведение журнала для отображения хода выполнения и результатов
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
try:
# Create a unique instance name
instance_name = f"{name}_{i+1}"
logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")
# Schedule the orchestration
instance_id = client.schedule_new_orchestration(
"function_chaining_orchestrator",
input=instance_name
)
instance_ids.append(instance_id)
logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")
# Wait before scheduling next orchestration (except for the last one)
if i < TOTAL_ORCHESTRATIONS - 1:
logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
try:
logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=120
)
Рабочий
Реализация оркестрации
Оркестрация напрямую вызывает каждое действие в последовательности с помощью стандартной call_activity
функции:
# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
"""Orchestrator that demonstrates function chaining pattern."""
logger.info(f"Starting function chaining orchestration for {name}")
# Call first activity - passing input directly without named parameter
greeting = yield ctx.call_activity('say_hello', input=name)
# Call second activity with the result from first activity
processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)
# Call third activity with the result from second activity
final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)
return final_response
Каждое действие реализуется как отдельная функция:
# Activity functions
def say_hello(ctx, name: str) -> str:
"""First activity that greets the user."""
logger.info(f"Activity say_hello called with name: {name}")
return f"Hello {name}!"
def process_greeting(ctx, greeting: str) -> str:
"""Second activity that processes the greeting."""
logger.info(f"Activity process_greeting called with greeting: {greeting}")
return f"{greeting} How are you today?"
def finalize_response(ctx, response: str) -> str:
"""Third activity that finalizes the response."""
logger.info(f"Activity finalize_response called with response: {response}")
return f"{response} I hope you're doing well!"
Рабочий использует DurableTaskSchedulerWorker
для правильного управления жизненным циклом.
with DurableTaskSchedulerWorker(
host_address=host_address,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential
) as worker:
# Register activities and orchestrators
worker.add_activity(say_hello)
worker.add_activity(process_greeting)
worker.add_activity(finalize_response)
worker.add_orchestrator(function_chaining_orchestrator)
# Start the worker (without awaiting)
worker.start()
Пример приложения контейнера содержит рабочий и клиентский код.
Клиент
Код клиента:
- Использует ту же логику строки подключения, что и рабочий
- Реализует последовательный планировщик оркестрации, который:
- Планируется 20 экземпляров оркестрации, по одному за раз
- Ожидает 5 секунд между планированием каждой оркестрации
- Отслеживает все экземпляры оркестрации в списке
- Ожидает завершения всех оркестраций перед выходом
- Использует стандартное ведение журнала для отображения хода выполнения и результатов
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null
? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();
// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
"ActivityChaining",
new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))
Рабочий
Оркестрация напрямую вызывает каждое действие в последовательности с помощью стандартного callActivity
метода:
DurableTaskGrpcWorker worker = (credential != null
? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "ActivityChaining"; }
@Override
public TaskOrchestration create() {
return ctx -> {
String input = ctx.getInput(String.class);
String x = ctx.callActivity("Reverse", input, String.class).await();
String y = ctx.callActivity("Capitalize", x, String.class).await();
String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
ctx.complete(z);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Reverse"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringBuilder builder = new StringBuilder(input);
builder.reverse();
return builder.toString();
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Capitalize"; }
@Override
public TaskActivity create() {
return ctx -> ctx.getInput(String.class).toUpperCase();
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "ReplaceWhitespace"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
return input.trim().replaceAll("\\s", "-");
};
}
})
.build();
// Start the worker
worker.start();