你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用 Durable Functions(Azure Functions 功能)在 Python 中编写有状态无服务器工作流。 在本快速入门中,你将克隆并运行一个用于演示两种常见编排模式的示例应用:
- 函数链式调用:依次调用活动(东京→西雅图→伦敦)。
- 扇出/扇入:跨五个城市并行调用活动,然后聚合结果。
最终,你将能够借助 Durable Task Scheduler 模拟器在本地运行这两个编排,并在仪表板中查看它们的状态。
- 克隆并准备 Hello Cities 示例项目。
- 为本地开发设置持久任务计划程序模拟器和 Azurite。
- 运行函数应用并触发这两个编排。
- 在 Durable Task Scheduler 仪表板中查看编排状态和输出。
先决条件
- 已安装 Python 3.9+。
- Azure Functions Core Tools v4 或更高版本。
- 用于运行模拟器和 Azurite 的 Docker。
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
设置持久任务计划程序模拟器
Durable Task Scheduler 模拟器提供本地开发环境,使你无需 Azure 订阅即可测试协调流程。 Functions 主机还需要 Azurite 进行本地存储。
启动两个容器:
docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
mcr.microsoft.com/dts/dts-emulator:latest
docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
mcr.microsoft.com/azure-storage/azurite
Tip
模拟器启动后,可在 http://localhost:8082 访问 Durable Task Scheduler 仪表板以监控编排。
运行快速入门示例
导航到 Hello Cities 示例目录:
cd samples/durable-functions/python/hello-cities创建虚拟环境并安装依赖项:
python -m venv .venv .venv\Scripts\activate pip install -r requirements.txt验证
local.settings.json文件中是否包含以下配置:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "python", "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" } }启动函数应用:
func start在另一个终端中,触发 函数链式调用 编排:
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining $response响应包含业务流程协调实例的状态 URL。
statusQueryGetUri复制该值并运行该值以检查结果:Invoke-RestMethod -Uri $response.statusQueryGetUri触发 扇出/扇入 编排:
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn Invoke-RestMethod -Uri $response.statusQueryGetUri
预期输出
POST 请求返回一个包含状态 URL 的 JSON 响应。 例如:
{
"id": "<instanceId>",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
"sendEventPostUri": "...",
"terminatePostUri": "...",
"purgeHistoryDeleteUri": "..."
}
当您查询statusQueryGetUri,且该编排的runtimeStatus为Completed时,可以在output字段中找到问候结果。 链式编排返回:
{
"name": "chaining_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}
扇出/扇入协调器返回:
{
"name": "fan_out_fan_in_orchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}
Tip
如果 runtimeStatus 显示 Running 或 Pending,请稍等片刻,然后再次查询 statusQueryGetUri 。
打开 Durable Task Scheduler 仪表板 http://localhost:8082 以查看业务流程状态和执行历史记录。
了解代码
此示例使用带有修饰器的 Python v2 编程模型,其中所有函数都在单个文件中定义(function_app.py)。
活动函数
say_hello 活动接受一个城市名称并返回一条问候语:
@app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
"""Activity function that returns a greeting for a city."""
logging.info(f"Saying hello to {city}.")
return f"Hello {city}!"
协调器函数
链式编排器依次针对三个城市调用say_hello:
@app.orchestration_trigger(context_name="context")
def chaining_orchestration(context: df.DurableOrchestrationContext):
"""Function chaining orchestration: calls activities sequentially."""
result1 = yield context.call_activity("say_hello", "Tokyo")
result2 = yield context.call_activity("say_hello", "Seattle")
result3 = yield context.call_activity("say_hello", "London")
return [result1, result2, result3]
扇出/扇入协调程序并行调度各项活动:
@app.orchestration_trigger(context_name="context")
def fan_out_fan_in_orchestration(context: df.DurableOrchestrationContext):
"""Fan-out/Fan-in orchestration: calls activities in parallel."""
cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]
# Fan-out: schedule all activities in parallel
parallel_tasks = []
for city in cities:
task = context.call_activity("say_hello", city)
parallel_tasks.append(task)
# Fan-in: wait for all to complete
results = yield context.task_all(parallel_tasks)
return results
客户端函数
HTTP 触发的客户端函数会启动每个业务流程协调流程。 例如,链式调用起始器:
@app.route(route="StartChaining", methods=["POST"])
@app.durable_client_input(client_name="client")
async def start_chaining(req: func.HttpRequest, client) -> func.HttpResponse:
"""HTTP trigger to start the function chaining orchestration."""
instance_id = await client.start_new("chaining_orchestration")
logging.info(f"Started chaining orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
配置
此示例使用 Durable Task Scheduler 模拟器作为其存储后端。 这将在 host.json 中进行配置:
{
"version": "2.0",
"logging": {
"logLevel": {
"DurableTask.Core": "Warning"
}
},
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
清理资源
完成后停止模拟器容器:
docker stop dtsemulator azurite && docker rm dtsemulator azurite
若要停用Python虚拟环境,请执行以下操作:
deactivate
后续步骤
- 了解 常见的 Durable Functions 应用模式。
- 了解有关Durable Functions 存储提供程序的信息。