你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:创建 Python Durable Functions 应用

使用 Durable Functions(Azure Functions 功能)在 Python 中编写有状态无服务器工作流。 在本快速入门中,你将克隆并运行一个用于演示两种常见编排模式的示例应用:

  • 函数链式调用:依次调用活动(东京→西雅图→伦敦)。
  • 扇出/扇入:跨五个城市并行调用活动,然后聚合结果。

最终,你将能够借助 Durable Task Scheduler 模拟器在本地运行这两个编排,并在仪表板中查看它们的状态。

  • 克隆并准备 Hello Cities 示例项目。
  • 为本地开发设置持久任务计划程序模拟器和 Azurite。
  • 运行函数应用并触发这两个编排。
  • 在 Durable Task Scheduler 仪表板中查看编排状态和输出。

先决条件

设置持久任务计划程序模拟器

Durable Task Scheduler 模拟器提供本地开发环境,使你无需 Azure 订阅即可测试协调流程。 Functions 主机还需要 Azurite 进行本地存储。

启动两个容器:

docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
  mcr.microsoft.com/dts/dts-emulator:latest

docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
  mcr.microsoft.com/azure-storage/azurite

Tip

模拟器启动后,可在 http://localhost:8082 访问 Durable Task Scheduler 仪表板以监控编排。

运行快速入门示例

  1. 导航到 Hello Cities 示例目录:

    cd samples/durable-functions/python/hello-cities
    
  2. 创建虚拟环境并安装依赖项:

    python -m venv .venv
    .venv\Scripts\activate
    pip install -r requirements.txt
    
  3. 验证 local.settings.json 文件中是否包含以下配置:

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
      }
    }
    
  4. 启动函数应用:

    func start
    
  5. 在另一个终端中,触发 函数链式调用 编排:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining
    $response
    

    响应包含业务流程协调实例的状态 URL。 statusQueryGetUri复制该值并运行该值以检查结果:

    Invoke-RestMethod -Uri $response.statusQueryGetUri
    
  6. 触发 扇出/扇入 编排:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn
    Invoke-RestMethod -Uri $response.statusQueryGetUri
    

预期输出

POST 请求返回一个包含状态 URL 的 JSON 响应。 例如:

{
  "id": "<instanceId>",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
  "sendEventPostUri": "...",
  "terminatePostUri": "...",
  "purgeHistoryDeleteUri": "..."
}

当您查询statusQueryGetUri,且该编排的runtimeStatusCompleted时,可以在output字段中找到问候结果。 链式编排返回:

{
  "name": "chaining_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}

扇出/扇入协调器返回:

{
  "name": "fan_out_fan_in_orchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}

Tip

如果 runtimeStatus 显示 RunningPending,请稍等片刻,然后再次查询 statusQueryGetUri

打开 Durable Task Scheduler 仪表板 http://localhost:8082 以查看业务流程状态和执行历史记录。

了解代码

此示例使用带有修饰器的 Python v2 编程模型,其中所有函数都在单个文件中定义(function_app.py)。

活动函数

say_hello 活动接受一个城市名称并返回一条问候语:

@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    """Activity function that returns a greeting for a city."""
    logging.info(f"Saying hello to {city}.")
    return f"Hello {city}!"

协调器函数

链式编排器依次针对三个城市调用say_hello

@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
    """Function chaining orchestration: calls activities sequentially."""
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

扇出/扇入协调程序并行调度各项活动:

@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
    """Fan-out/Fan-in orchestration: calls activities in parallel."""
    cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]

    # Fan-out: schedule all activities in parallel
    parallel_tasks = []
    for city in cities:
        task = context.call_activity("say_hello", city)
        parallel_tasks.append(task)

    # Fan-in: wait for all to complete
    results = yield context.task_all(parallel_tasks)
    return results

客户端函数

HTTP 触发的客户端函数会启动每个业务流程协调流程。 例如,链式调用起始器:

@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
    """HTTP trigger to start the function chaining orchestration."""
    instance_id = await client.start_new("chaining_orchestration")
    logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

配置

此示例使用 Durable Task Scheduler 模拟器作为其存储后端。 这将在 host.json 中进行配置:

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "DurableTask.Core": "Warning"
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "default",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

清理资源

完成后停止模拟器容器:

docker stop dtsemulator azurite && docker rm dtsemulator azurite

若要停用Python虚拟环境,请执行以下操作:

deactivate

后续步骤