Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Используйте Устойчивые функции, одну из возможностей Функции Azure, для создания бессерверных рабочих процессов с отслеживанием состояния на Python. В этом кратком руководстве вы клонируете и запускаете пример приложения, в котором демонстрируются два распространенных шаблона оркестрации:
- Цепочка функций: вызывает действия последовательно (Токио → Сиэтл → Лондон).
- Fan-out/fan-in: Параллельно вызывает активности в пяти городах, а затем агрегирует результаты.
В итоге у вас будут локально запущены обе оркестрации с помощью эмулятора Durable Task Scheduler, и вы сможете просматривать их состояние в панели мониторинга.
- Клонируйте и подготовьте пример проекта Hello Cities.
- Настройте эмулятор планировщика устойчивых задач и Azurite для локальной разработки.
- Запустите приложение-функцию и инициируйте обе оркестрации.
- Просмотрите состояние оркестрации и результаты в панели мониторинга Durable Task Scheduler.
Необходимые условия
- Python 3.9+ установлен.
- Функции Azure Core Tools версии 4 или более поздней версии.
- Docker для запуска эмулятора и Azurite.
- Клонируйте репозиторий Durable Task Scheduler на GitHub для работы с примером начального уровня.
Настройте эмулятор планировщика Durable Task
Эмулятор планировщика задач Durable Task Scheduler предоставляет локальную среду разработки, чтобы можно было протестировать оркестрации без подписки Azure. Узел функций также требует Azurite для локального хранилища.
Запустите оба контейнера:
docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
mcr.microsoft.com/dts/dts-emulator:latest
docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
mcr.microsoft.com/azure-storage/azurite
Tip
После запуска эмулятора вы можете получить доступ к панели мониторинга Durable Task Scheduler по адресу http://localhost:8082, чтобы отслеживать оркестрации.
Запустите пример из краткого руководства
Перейдите в пример каталога Hello Cities:
cd samples/durable-functions/python/hello-citiesСоздайте виртуальную среду и установите необходимые зависимости.
python -m venv .venv .venv\Scripts\activate pip install -r requirements.txtУбедитесь, что
local.settings.jsonфайл содержит следующую конфигурацию:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "python", "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" } }Запустите приложение-функцию:
func startВ отдельном терминале запустите оркестрацию последовательного вызова функций:
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining $responseОтвет содержит URL-адреса состояния для экземпляра оркестрации.
statusQueryGetUriСкопируйте значение и запустите его, чтобы проверить результат:Invoke-RestMethod -Uri $response.statusQueryGetUriАктивируйте оркестрацию вентилятора или вентилятора :
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn Invoke-RestMethod -Uri $response.statusQueryGetUri
Ожидаемые выходные данные
Запрос POST возвращает ответ JSON с URL-адресами состояния. Рассмотрим пример.
{
"id": "<instanceId>",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
"sendEventPostUri": "...",
"terminatePostUri": "...",
"purgeHistoryDeleteUri": "..."
}
Когда вы запрашиваете statusQueryGetUri и значение runtimeStatus оркестрации — Completed, результаты приветствия можно найти в поле output. Возвращается оркестрация цепочек:
{
"name": "chaining_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}
Оркестрация fan-out/fan-in возвращает:
{
"name": "fan_out_fan_in_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}
Tip
Если на runtimeStatus отображается Running или Pending, подождите немного и снова запросите statusQueryGetUri.
Откройте панель мониторинга Durable Task Scheduler по адресу http://localhost:8082, чтобы просмотреть состояние оркестрации и историю выполнения.
Изучение кода
В примере используется модель программирования Python версии 2 с декораторами, где все функции определены в одном файле (function_app.py).
Функция активности
Действие say_hello принимает имя города и возвращает приветствие:
@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
"""Activity function that returns a greeting for a city."""
logging.info(f"Saying hello to {city}.")
return f"Hello {city}!"
Функции оркестратора
Оркестратор цепочек последовательно вызывает say_hello для трех городов:
@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
"""Function chaining orchestration: calls activities sequentially."""
result1 = yield context.call_activity("say_hello", "Tokyo")
result2 = yield context.call_activity("say_hello", "Seattle")
result3 = yield context.call_activity("say_hello", "London")
return [result1, result2, result3]
Оркестратор «fan-out/fan-in» запускает действия параллельно:
@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
"""Fan-out/Fan-in orchestration: calls activities in parallel."""
cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]
# Fan-out: schedule all activities in parallel
parallel_tasks = []
for city in cities:
task = context.call_activity("say_hello", city)
parallel_tasks.append(task)
# Fan-in: wait for all to complete
results = yield context.task_all(parallel_tasks)
return results
Клиентские функции
Клиентские функции, запускаемые по HTTP, запускают каждую оркестрацию. Например, средство запуска цепочки:
@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
"""HTTP trigger to start the function chaining orchestration."""
instance_id = await client.start_new("chaining_orchestration")
logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Конфигурация
В этом примере в качестве механизма хранения используется эмулятор Durable Task Scheduler. Это настроено в host.json:
{
"version": "2.0",
"logging": {
"logLevel": {
"DurableTask.Core": "Warning"
}
},
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
Очистите ресурсы
Остановите контейнеры эмулятора после завершения:
docker stop dtsemulator azurite && docker rm dtsemulator azurite
Чтобы отключить виртуальную среду Python, выполните следующие действия.
deactivate
Дальнейшие действия
- Узнайте о общих шаблонах приложений Устойчивые функции.
- Узнайте о поставщиках хранилища Устойчивые функции.