你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:创建 Java Durable Functions 应用

使用 Durable Functions(Azure Functions 功能)在 Java 中编写有状态无服务器工作流。 在本快速入门中,你将克隆并运行一个示例应用,该应用演示了两种常见的编排模式:

  • 函数链式调用:依次调用活动(东京→西雅图→伦敦)。
  • 扇出/扇入:跨五个城市并行调用活动,然后聚合结果。

最终,你将能够借助 Durable Task Scheduler 模拟器在本地运行这两个编排,并在仪表板中查看它们的状态。

  • 克隆并准备 Hello Cities 示例项目。
  • 为本地开发设置持久任务计划程序模拟器和 Azurite。
  • 构建并运行函数应用,并触发这两个编排。
  • 在 Durable Task Scheduler 仪表板中查看编排状态和输出。

先决条件

设置持久任务计划程序模拟器

Durable Task Scheduler 模拟器提供本地开发环境,使你无需 Azure 订阅即可测试协调流程。 Java Functions 主机还需要 Azurite 用于本地存储。

启动两个容器:

docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
  mcr.microsoft.com/dts/dts-emulator:latest

docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
  mcr.microsoft.com/azure-storage/azurite

Tip

模拟器启动后,可在 http://localhost:8082 访问 Durable Task Scheduler 仪表板以监控编排。

运行快速入门示例

  1. 导航到 Hello Cities 示例目录:

    cd samples/durable-functions/java/HelloCities
    
  2. 验证 local.settings.json 文件包含以下配置:

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "java",
        "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
      }
    }
    
  3. 生成项目:

    mvn clean package
    
  4. 启动函数应用:

    mvn azure-functions:run
    
  5. 在单独的终端中,触发 函数链式调用 编排:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining
    $response
    

    响应包含业务流程协调实例的状态 URL。 statusQueryGetUri复制该值并运行该值以检查结果:

    Invoke-RestMethod -Uri $response.statusQueryGetUri
    
  6. 触发扇出/扇入编排:

    $response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn
    Invoke-RestMethod -Uri $response.statusQueryGetUri
    

预期输出

POST 请求返回一个包含状态 URL 的 JSON 响应。 例如:

{
  "id": "<instanceId>",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
  "sendEventPostUri": "...",
  "terminatePostUri": "...",
  "purgeHistoryDeleteUri": "..."
}

当您查询statusQueryGetUri且该业务流程的runtimeStatusCompleted时,您可以在output字段中找到问候结果。 链式编排返回:

{
  "name": "ChainingOrchestration",
  "runtimeStatus": "Completed",
  "output": "Hello Tokyo! Hello Seattle! Hello London!"
}

扇出/扇入编排返回结果为:

{
  "name": "FanOutFanInOrchestration",
  "runtimeStatus": "Completed",
  "output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}

Tip

如果 runtimeStatus 显示 RunningPending,请稍等片刻,然后再次查询 statusQueryGetUri

打开 Durable Task Scheduler 仪表板 http://localhost:8082 以查看业务流程状态和执行历史记录。

了解代码

src/main/java/com/example/Functions.java中的示例项目包含Durable Functions应用所需的所有三种函数类型。

活动函数

SayHello 活动接受一个城市名称并返回一条问候语:

@FunctionName("SayHello")
public String sayHello(
        @DurableActivityTrigger(name = "city") String city) {
    return "Hello " + city + "!";
}

协调器函数

链式编排器针对三个城市依次调用SayHello

@FunctionName("ChainingOrchestration")
public String chainingOrchestration(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {

    String result = "";
    result += ctx.callActivity("SayHello", "Tokyo", String.class).await();
    result += " " + ctx.callActivity("SayHello", "Seattle", String.class).await();
    result += " " + ctx.callActivity("SayHello", "London", String.class).await();
    return result;
}

扇出/扇入协调程序并行调度活动:

@FunctionName("FanOutFanInOrchestration")
public List<String> fanOutFanInOrchestration(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {

    String[] cities = {"Tokyo", "Seattle", "London", "Paris", "Berlin"};
    List<Task<String>> parallelTasks = new ArrayList<>();

    for (String city : cities) {
        parallelTasks.add(ctx.callActivity("SayHello", city, String.class));
    }

    List<String> results = new ArrayList<>();
    for (Task<String> task : parallelTasks) {
        results.add(task.await());
    }

    return results;
}

客户端函数

由 HTTP 触发的客户端函数启动每个编排:

@FunctionName("StartChaining")
public HttpResponseMessage startChaining(
        @HttpTrigger(name = "req", methods = {HttpMethod.POST},
            authLevel = AuthorizationLevel.ANONYMOUS)
            HttpRequestMessage<Void> request,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    String instanceId = client.scheduleNewOrchestrationInstance("ChainingOrchestration");
    return durableContext.createCheckStatusResponse(request, instanceId);
}

配置

此示例使用 Durable Task Scheduler 模拟器作为其存储后端。 这将在 host.json 中进行配置:

{
  "extensions": {
    "durableTask": {
      "hubName": "default",
      "storageProvider": {
        "type": "azureManaged",
        "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
      }
    }
  }
}

模拟器连接字符串在 local.settings.json 中设置:

{
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "java",
    "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
  }
}

清理资源

完成后停止模拟器容器:

docker stop dtsemulator azurite && docker rm dtsemulator azurite

后续步骤