Быстрый старт: Создание приложения Python Устойчивые функции

Используйте Устойчивые функции, одну из возможностей Функции Azure, для создания бессерверных рабочих процессов с отслеживанием состояния на Python. В этом кратком руководстве вы клонируете и запускаете пример приложения, в котором демонстрируются два распространенных шаблона оркестрации:

  • Цепочка функций: вызывает действия последовательно (Токио → Сиэтл → Лондон).
  • Fan-out/fan-in: Параллельно вызывает активности в пяти городах, а затем агрегирует результаты.

В итоге у вас будут локально запущены обе оркестрации с помощью эмулятора Durable Task Scheduler, и вы сможете просматривать их состояние в панели мониторинга.

  • Клонируйте и подготовьте пример проекта Hello Cities.
  • Настройте эмулятор планировщика устойчивых задач и Azurite для локальной разработки.
  • Запустите приложение-функцию и инициируйте обе оркестрации.
  • Просмотрите состояние оркестрации и результаты в панели мониторинга Durable Task Scheduler.

Необходимые условия

  • Python 3.9+ установлен.
  • Функции Azure Core Tools версии 4 или более поздней версии.
  • Docker для запуска эмулятора и Azurite.
  • Клонируйте репозиторий Durable Task Scheduler на GitHub для работы с примером начального уровня.

Настройте эмулятор планировщика Durable Task

Эмулятор планировщика задач Durable Task Scheduler предоставляет локальную среду разработки, чтобы можно было протестировать оркестрации без подписки Azure. Узел функций также требует Azurite для локального хранилища.

Запустите оба контейнера:

docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
  mcr.microsoft.com/dts/dts-emulator:latest

docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
  mcr.microsoft.com/azure-storage/azurite

Tip

После запуска эмулятора вы можете получить доступ к панели мониторинга Durable Task Scheduler по адресу http://localhost:8082, чтобы отслеживать оркестрации.

Запустите пример из краткого руководства

  1. Перейдите в пример каталога Hello Cities:

    cd samples/durable-functions/python/hello-cities
    
  2. Создайте виртуальную среду и установите необходимые зависимости.

    python -m venv .venv
    .venv\Scripts\activate
    pip install -r requirements.txt
    
  3. Убедитесь, что local.settings.json файл содержит следующую конфигурацию:

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
      }
    }
    
  4. Запустите приложение-функцию:

    func start
    
  5. В отдельном терминале запустите оркестрацию последовательного вызова функций:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining
    $response
    

    Ответ содержит URL-адреса состояния для экземпляра оркестрации. statusQueryGetUri Скопируйте значение и запустите его, чтобы проверить результат:

    Invoke-RestMethod -Uri $response.statusQueryGetUri
    
  6. Активируйте оркестрацию вентилятора или вентилятора :

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn
    Invoke-RestMethod -Uri $response.statusQueryGetUri
    

Ожидаемые выходные данные

Запрос POST возвращает ответ JSON с URL-адресами состояния. Рассмотрим пример.

{
  "id": "<instanceId>",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
  "sendEventPostUri": "...",
  "terminatePostUri": "...",
  "purgeHistoryDeleteUri": "..."
}

Когда вы запрашиваете statusQueryGetUri и значение runtimeStatus оркестрации — Completed, результаты приветствия можно найти в поле output. Возвращается оркестрация цепочек:

{
  "name": "chaining_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}

Оркестрация fan-out/fan-in возвращает:

{
  "name": "fan_out_fan_in_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}

Tip

Если на runtimeStatus отображается Running или Pending, подождите немного и снова запросите statusQueryGetUri.

Откройте панель мониторинга Durable Task Scheduler по адресу http://localhost:8082, чтобы просмотреть состояние оркестрации и историю выполнения.

Изучение кода

В примере используется модель программирования Python версии 2 с декораторами, где все функции определены в одном файле (function_app.py).

Функция активности

Действие say_hello принимает имя города и возвращает приветствие:

@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    """Activity function that returns a greeting for a city."""
    logging.info(f"Saying hello to {city}.")
    return f"Hello {city}!"

Функции оркестратора

Оркестратор цепочек последовательно вызывает say_hello для трех городов:

@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
    """Function chaining orchestration: calls activities sequentially."""
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

Оркестратор «fan-out/fan-in» запускает действия параллельно:

@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
    """Fan-out/Fan-in orchestration: calls activities in parallel."""
    cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]

    # Fan-out: schedule all activities in parallel
    parallel_tasks = []
    for city in cities:
        task = context.call_activity("say_hello", city)
        parallel_tasks.append(task)

    # Fan-in: wait for all to complete
    results = yield context.task_all(parallel_tasks)
    return results

Клиентские функции

Клиентские функции, запускаемые по HTTP, запускают каждую оркестрацию. Например, средство запуска цепочки:

@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
    """HTTP trigger to start the function chaining orchestration."""
    instance_id = await client.start_new("chaining_orchestration")
    logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Конфигурация

В этом примере в качестве механизма хранения используется эмулятор Durable Task Scheduler. Это настроено в host.json:

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "DurableTask.Core": "Warning"
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "default",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

Очистите ресурсы

Остановите контейнеры эмулятора после завершения:

docker stop dtsemulator azurite && docker rm dtsemulator azurite

Чтобы отключить виртуальную среду Python, выполните следующие действия.

deactivate

Дальнейшие действия