你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用 Durable Functions(Azure Functions 功能)在 Java 中编写有状态无服务器工作流。 在本快速入门中,你将克隆并运行一个示例应用,该应用演示了两种常见的编排模式:
- 函数链式调用:依次调用活动(东京→西雅图→伦敦)。
- 扇出/扇入:跨五个城市并行调用活动,然后聚合结果。
最终,你将能够借助 Durable Task Scheduler 模拟器在本地运行这两个编排,并在仪表板中查看它们的状态。
- 克隆并准备 Hello Cities 示例项目。
- 为本地开发设置持久任务计划程序模拟器和 Azurite。
- 构建并运行函数应用,并触发这两个编排。
- 在 Durable Task Scheduler 仪表板中查看编排状态和输出。
先决条件
- 已安装 Java 11+ (JDK)。
- Apache Maven 3.0 或更高版本。
- Azure Functions Core Tools v4 或更高版本。
- 用于运行模拟器和 Azurite 的 Docker。
- 克隆 Durable Task Scheduler GitHub 存储库 以使用快速入门示例。
设置持久任务计划程序模拟器
Durable Task Scheduler 模拟器提供本地开发环境,使你无需 Azure 订阅即可测试协调流程。 Java Functions 主机还需要 Azurite 用于本地存储。
启动两个容器:
docker run -d --name dtsemulator -p 8080:8080 -p 8082:8082 \
mcr.microsoft.com/dts/dts-emulator:latest
docker run -d --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 \
mcr.microsoft.com/azure-storage/azurite
Tip
模拟器启动后,可在 http://localhost:8082 访问 Durable Task Scheduler 仪表板以监控编排。
运行快速入门示例
导航到 Hello Cities 示例目录:
cd samples/durable-functions/java/HelloCities验证
local.settings.json文件包含以下配置:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "java", "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" } }生成项目:
mvn clean package启动函数应用:
mvn azure-functions:run在单独的终端中,触发 函数链式调用 编排:
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartChaining $response响应包含业务流程协调实例的状态 URL。
statusQueryGetUri复制该值并运行该值以检查结果:Invoke-RestMethod -Uri $response.statusQueryGetUri触发扇出/扇入编排:
$response = Invoke-RestMethod -Method POST -Uri http://localhost:7071/api/StartFanOutFanIn Invoke-RestMethod -Uri $response.statusQueryGetUri
预期输出
POST 请求返回一个包含状态 URL 的 JSON 响应。 例如:
{
"id": "<instanceId>",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/<instanceId>?code=...",
"sendEventPostUri": "...",
"terminatePostUri": "...",
"purgeHistoryDeleteUri": "..."
}
当您查询statusQueryGetUri且该业务流程的runtimeStatus为Completed时,您可以在output字段中找到问候结果。 链式编排返回:
{
"name": "ChainingOrchestration",
"runtimeStatus": "Completed",
"output": "Hello Tokyo! Hello Seattle! Hello London!"
}
扇出/扇入编排返回结果为:
{
"name": "FanOutFanInOrchestration",
"runtimeStatus": "Completed",
"output": ["Hello Tokyo!", "Hello Seattle!", "Hello London!", "Hello Paris!", "Hello Berlin!"]
}
Tip
如果 runtimeStatus 显示 Running 或 Pending,请稍等片刻,然后再次查询 statusQueryGetUri 。
打开 Durable Task Scheduler 仪表板 http://localhost:8082 以查看业务流程状态和执行历史记录。
了解代码
src/main/java/com/example/Functions.java中的示例项目包含Durable Functions应用所需的所有三种函数类型。
活动函数
SayHello 活动接受一个城市名称并返回一条问候语:
@FunctionName("SayHello")
public String sayHello(
@DurableActivityTrigger(name = "city") String city) {
return "Hello " + city + "!";
}
协调器函数
链式编排器针对三个城市依次调用SayHello:
@FunctionName("ChainingOrchestration")
public String chainingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String result = "";
result += ctx.callActivity("SayHello", "Tokyo", String.class).await();
result += " " + ctx.callActivity("SayHello", "Seattle", String.class).await();
result += " " + ctx.callActivity("SayHello", "London", String.class).await();
return result;
}
扇出/扇入协调程序并行调度活动:
@FunctionName("FanOutFanInOrchestration")
public List<String> fanOutFanInOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String[] cities = {"Tokyo", "Seattle", "London", "Paris", "Berlin"};
List<Task<String>> parallelTasks = new ArrayList<>();
for (String city : cities) {
parallelTasks.add(ctx.callActivity("SayHello", city, String.class));
}
List<String> results = new ArrayList<>();
for (Task<String> task : parallelTasks) {
results.add(task.await());
}
return results;
}
客户端函数
由 HTTP 触发的客户端函数启动每个编排:
@FunctionName("StartChaining")
public HttpResponseMessage startChaining(
@HttpTrigger(name = "req", methods = {HttpMethod.POST},
authLevel = AuthorizationLevel.ANONYMOUS)
HttpRequestMessage<Void> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance("ChainingOrchestration");
return durableContext.createCheckStatusResponse(request, instanceId);
}
配置
此示例使用 Durable Task Scheduler 模拟器作为其存储后端。 这将在 host.json 中进行配置:
{
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
}
}
}
模拟器连接字符串在 local.settings.json 中设置:
{
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "java",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"
}
}
清理资源
完成后停止模拟器容器:
docker stop dtsemulator azurite && docker rm dtsemulator azurite