你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:创建 TypeScript Durable Functions 应用

使用 Durable Functions(Azure Functions 功能)在 TypeScript 中编写有状态无服务器工作流。 在本快速入门中,你将克隆并运行一个示例应用,该应用演示了两种常见的编排模式:

  • 函数链式调用:依次调用活动(东京→西雅图→伦敦)。
  • 扇出/扇入:跨五个城市并行调用活动,然后聚合结果。

最终,你将能够借助 Durable Task Scheduler 模拟器在本地运行这两个编排,并在仪表板中查看它们的状态。

  • 克隆并准备 Hello Cities 示例项目。
  • 为本地开发设置持久任务计划程序模拟器和 Azurite。
  • 运行函数应用并触发这两个协调流程。
  • 在 Durable Task Scheduler 仪表板中查看编排状态和输出。

先决条件

设置持久任务计划程序模拟器

Durable Task Scheduler 模拟器提供本地开发环境,使你无需 Azure 订阅即可测试协调流程。 Functions 主机还需要 Azurite 进行本地存储。

启动两个容器:

docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
  mcr.microsoft.com/dts/dts-emulator:latest

docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
  mcr.microsoft.com/azure-storage/azurite

Tip

模拟器启动后,可在 http://localhost:8082 访问 Durable Task Scheduler 仪表板以监控编排。

运行快速入门示例

  1. 导航到 Hello Cities 示例目录:

    cd samples/durable-functions/typescript/HelloCities
    
  2. 安装依赖项并构建项目:

    npm install
    npm run build
    
  3. 验证 local.settings.json 文件包含以下配置:

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "node",
        "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
      }
    }
    
  4. 启动函数应用:

    func start
    
  5. 在单独的终端中,触发 函数链式调用 编排:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining
    $response
    

    响应包含业务流程协调实例的状态 URL。 statusQueryGetUri复制该值并运行该值以检查结果:

    Invoke-RestMethod -Uri $response.statusQueryGetUri
    
  6. 触发扇出/扇入编排:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn
    Invoke-RestMethod -Uri $response.statusQueryGetUri
    

预期输出

POST 请求返回一个包含状态 URL 的 JSON 响应。 例如:

{
  "id": "<instanceId>",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
  "sendEventPostUri": "...",
  "terminatePostUri": "...",
  "purgeHistoryDeleteUri": "..."
}

当您查询statusQueryGetUri且该业务流程的runtimeStatusCompleted时,您可以在output字段中找到问候结果。 链式编排返回:

{
  "name": "chainingOrchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
}

扇出/扇入编排返回结果为:

{
  "name": "fanOutFanInOrchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}

Tip

如果 runtimeStatus 显示 RunningPending,请稍等片刻,然后再次查询 statusQueryGetUri

打开 Durable Task Scheduler 仪表板 http://localhost:8082 以查看业务流程状态和执行历史记录。

了解代码

此示例使用 Node.js v4 编程模型,其中所有函数都在单个文件中定义(src/functions/helloCities.ts)。

活动函数

sayHello 活动接受一个城市名称并返回一条问候语:

df.app.activity("sayHello", {
  handler: (city: string): string => {
    return `Hello ${city}!`;
  },
});

协调器函数

链式编排器针对三个城市依次调用sayHello

const chainingOrchestrator: OrchestrationHandler = function* (
  context: OrchestrationContext
) {
  const outputs: string[] = [];
  outputs.push(yield context.df.callActivity("sayHello", "Tokyo"));
  outputs.push(yield context.df.callActivity("sayHello", "Seattle"));
  outputs.push(yield context.df.callActivity("sayHello", "London"));
  return outputs;
};
df.app.orchestration("chainingOrchestration", chainingOrchestrator);

扇出/扇入协调程序并行调度活动:

const fanOutFanInOrchestrator: OrchestrationHandler = function* (
  context: OrchestrationContext
) {
  const cities: string[] = ["Tokyo", "Seattle", "London", "Paris", "Berlin"];

  // Fan-out: schedule all activities in parallel
  const tasks = cities.map((city) => context.df.callActivity("sayHello", city));

  // Fan-in: wait for all to complete
  const results: string[] = yield context.df.Task.all(tasks);
  return results;
};
df.app.orchestration("fanOutFanInOrchestration", fanOutFanInOrchestrator);

客户端函数

由 HTTP 触发的客户端函数启动每个协调流程。 例如,链式入门示例:

app.http("StartChaining", {
  route: "StartChaining",
  methods: ["POST"],
  authLevel: "anonymous",
  extraInputs: [df.input.durableClient()],
  handler: async (
    request: HttpRequest,
    context: InvocationContext
  ): Promise<HttpResponse> => {
    const client = df.getClient(context);
    const instanceId = await client.startNew("chainingOrchestration");
    context.log(`Started chaining orchestration with ID = '${instanceId}'.`);
    return client.createCheckStatusResponse(request, instanceId);
  },
});

配置

此示例使用 Durable Task Scheduler 模拟器作为其存储后端。 这将在 host.json 中进行配置:

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "DurableTask.Core": "Warning"
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "default",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

清理资源

完成后停止模拟器容器:

docker stop dtsemulator azurite && docker rm dtsemulator azurite

后续步骤