長期工作 SDK 工具包為 Durable Task 排程器提供輕量型用戶端程式庫。 在本快速入門中,您將瞭解如何建立使用展開式/收合式應用程式模式進行平行處理的流程。
這很重要
目前,長期工作 SDK 不適用於 JavaScript 和 PowerShell。
這很重要
目前,長期工作 SDK 不適用於 JavaScript 和 PowerShell。
- 設定並執行耐用任務排程器模擬器以進行本機開發。
- 執行工作者和客戶端專案。
- 透過長期工作排程器儀錶板檢閱協調流程狀態和歷程記錄。
先決條件
在開始之前:
- 請確定您有 .NET 8 SDK 或更新版本。
- 安裝 Docker 以執行模擬器。
- 複製 長期工作排程器 GitHub 存放庫 ,以使用快速入門範例。
- 請確定您有 Python 3.9+ 或更新版本。
- 安裝 Docker 以執行模擬器。
- 複製 長期工作排程器 GitHub 存放庫 ,以使用快速入門範例。
- 請確定您有 Java 8 或 11。
- 安裝 Docker 以執行模擬器。
- 複製 長期工作排程器 GitHub 存放庫 ,以使用快速入門範例。
設定長期工作排程器模擬器
應用程式程式代碼會尋找已部署的排程器和工作中樞資源。 如果找不到任何項目,程式代碼就會回復至模擬器。 模擬器會模擬 Docker 容器中的排程器和工作中樞,使其非常適合本快速入門中所需的本機開發。
Azure-Samples/Durable-Task-Scheduler
從根目錄,流覽至 .NET SDK 範例目錄。cd samples/durable-task-sdks/dotnet/FanOutFanIn
提取模擬器的 Docker 映像。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
執行模擬器。 容器可能需要幾秒鐘的時間才能就緒。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
由於範例程式代碼會自動使用預設模擬器設定,因此您不需要設定任何環境變數。 本快速入門的預設模擬器設定如下:
- 端點:
http://localhost:8080
- 工作中樞:
default
Azure-Samples/Durable-Task-Scheduler
從根目錄,流覽至 Python SDK 範例目錄。cd samples/durable-task-sdks/python/fan-out-fan-in
提取模擬器的 Docker 映像。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
執行模擬器。 容器可能需要幾秒鐘的時間才能就緒。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
由於範例程式代碼會自動使用預設模擬器設定,因此您不需要設定任何環境變數。 本快速入門的預設模擬器設定如下:
- 端點:
http://localhost:8080
- 工作中樞:
default
Azure-Samples/Durable-Task-Scheduler
從根目錄,流覽至 Java SDK 範例目錄。cd samples/durable-task-sdks/java/fan-out-fan-in
提取模擬器的 Docker 映像。
docker pull mcr.microsoft.com/dts/dts-emulator:latest
執行模擬器。 容器可能需要幾秒鐘的時間才能就緒。
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
由於範例程式代碼會自動使用預設模擬器設定,因此您不需要設定任何環境變數。 本快速入門的預設模擬器設定如下:
- 端點:
http://localhost:8080
- 工作中樞:
default
執行快速入門
在
FanOutFanIn
目錄中,導航至Worker
目錄以建置並執行工作程序。cd Worker dotnet build dotnet run
在另一個終端機中,從
FanOutFanIn
目錄切換到Client
目錄以建置並執行用戶端。cd Client dotnet build dotnet run
了解輸出
當您執行此範例時,您會收到工作程式和客戶端程式的輸出。 分析您執行專案時程式碼中發生的事情。
工作者輸出
工作元件輸出顯示:
- 協調器和活動的註冊
- 呼叫每個活動時記錄條目
- 多個工作專案的平行處理
- 結果的最終匯總
客戶端輸出
用戶端輸出會顯示:
- 從工作專案清單開始的協調流程
- 唯一的編排實例標識碼
- 最終匯總的結果,顯示每個工作專案及其對應的結果
- 已處理項目的總計數
範例輸出
Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
啟動 Python 虛擬環境。
python -m venv venv /venv/Scripts/activate
安裝必要的套件。
pip install -r requirements.txt
啟動工作程序。
python worker.py
預期的輸出
您可以看到輸出,顯示工作執行緒已啟動並正在等候工作項目。
Starting Fan Out/Fan In pattern worker... Using taskhub: default Using endpoint: http://localhost:8080 Starting gRPC worker that connects to http://localhost:8080 Successfully connected to http://localhost:8080. Waiting for work items...
在新終端機中,啟動虛擬環境並執行用戶端。
venv/Scripts/activate python client.py
您可以提供工作項目的數量作為參數。 如果未提供自變數,則範例預設會執行10個專案。
python client.py [number_of_items]
了解輸出
當您執行此範例時,您會收到工作程式和客戶端程式的輸出。 分析您執行專案時程式碼中發生的事情。
工作者輸出
工作元件輸出顯示:
- 協調器和活動的註冊。
- 平行處理每個工作項目時的狀態消息,顯示它們正在同時執行。
- 每個工作項目的隨機延遲(介於 0.5 到 2 秒之間),以模擬不同的處理時間。
- 顯示結果匯總的最終訊息。
客戶端輸出
用戶端輸出會顯示:
- 從指定的工作項目數目開始的協調流程。
- 唯一的協調流程實例標識碼。
- 最終匯總的結果,其中包括:
- 已處理的項目總數
- 所有結果的總和(每個項目結果都是其值的平方)
- 所有結果的平均值
範例輸出
Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED
fan-out-fan-in
從目錄中,使用 Gradle 建置並執行應用程式。
./gradlew runFanOutFanInPattern
小提示
如果您收到錯誤訊息 zsh: permission denied: ./gradlew
,請嘗試在執行應用程式之前執行 chmod +x gradlew
。
了解輸出
當您執行此範例時,您會收到顯示下列項目的輸出:
- 協調器和活動的註冊。
- 平行處理每個工作項目時的狀態消息,顯示它們正在同時執行。
- 每個工作項目的隨機延遲(介於 0.5 到 2 秒之間),以模擬不同的處理時間。
- 顯示結果匯總的最終訊息。
範例輸出
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
既然您已在本機執行專案,現在可以學習如何將專案部署到以 Azure Container Apps 裝載的 Azure。
檢視協調流程狀態和歷程記錄
您可以透過 長期工作排程器儀錶板來檢視協調流程狀態和歷程記錄。 根據預設,儀錶板會在埠 8082 上執行。
- 在網頁瀏覽器中瀏覽至 http://localhost:8082 。
- 按一下 預設 工作中樞。 您所建立的協調流程實例位於清單中。
- 點擊協調實例 ID 以檢視執行細節,包括:
- 多個活動工作的平行執行
- 扇入聚合步驟
- 每個步驟的輸入和輸出
- 每個步驟所花費的時間
瞭解程式代碼結構
工人專案
為了示範 扇出/扇入模式,工作專案流程編排會建立平行活動任務,並等待所有完成。 協調器:
- 接受工作專案清單做為輸入。
- 使用
ProcessWorkItemActivity
為每個工作項目創建單獨的任務,以分配工作。 - 平行執行所有工作。
- 使用
Task.WhenAll
等候所有工作完成。 - 使用
AggregateResultsActivity
來匯總所有個別結果中的粉絲。 - 將最終匯總的結果傳回用戶端。
工作專案包含:
- ParallelProcessingOrchestration.cs:在單一檔案中定義協調器和活動函式。
- Program.cs:使用適當的連接字串處理設定背景工作主機。
ParallelProcessingOrchestration.cs
協調程序會使用扇出/扇入的方式,建立平行的活動任務,並等候所有任務完成。
public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
每個活動都會實作為以 [DurableTask]
屬性裝飾的個別類別。
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
// Implementation processes a single work item
}
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
// Implementation aggregates individual results
}
Program.cs
工作者使用 Microsoft.Extensions.Hosting
進行適當的生命週期管理。
using Microsoft.Extensions.Hosting;
//..
builder.Services.AddDurableTaskWorker()
.AddTasks(registry =>
{
registry.AddOrchestrator<ParallelProcessingOrchestration>();
registry.AddActivity<ProcessWorkItemActivity>();
registry.AddActivity<AggregateResultsActivity>();
})
.UseDurableTaskScheduler(connectionString);
用戶端專案
用戶端專案:
- 使用與工作者相同的連接字串邏輯。
- 建立要平行處理的工作項目清單。
- 使用清單做為輸入來排程協調流程實例。
- 等候協調流程完成,並顯示匯總的結果。
- 使用
WaitForInstanceCompletionAsync
以達到高效的輪詢。
List<string> workItems = new List<string>
{
"Task1",
"Task2",
"Task3",
"LongerTask4",
"VeryLongTask5"
};
// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"ParallelProcessingOrchestration",
workItems);
// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
instanceId,
getInputsAndOutputs: true,
cts.Token);
worker.py
為了示範 扇出/扇入模式,工作專案流程編排會建立平行活動任務,並等待所有完成。 協調器:
- 接收一份工作項目清單作為輸入。
- 它透過為每個工作項目建立平行工作來「分散」,(針對每個工作項目呼叫
process_work_item
)。 - 它會使用
task.when_all
等候所有工作完成。 - 然後,它通過匯總結果來整合這些與
aggregate_results
活動相關的結果。 - 最終匯總的結果會傳回至用戶端。
協調程序會使用扇出/扇入的方式,建立平行的活動任務,並等候所有任務完成。
# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")
# Fan out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity("process_work_item", input=item))
# Wait for all tasks to complete
logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
results = yield task.when_all(parallel_tasks)
# Fan in: Aggregate all the results
logger.info("All parallel tasks completed, aggregating results")
final_result = yield ctx.call_activity("aggregate_results", input=results)
return final_result
client.py
用戶端專案:
- 使用與工作者相同的連接字串邏輯。
- 建立要平行處理的工作項目清單。
- 使用清單做為輸入來排程協調流程實例。
- 等候協調流程完成,並顯示匯總的結果。
- 使用
wait_for_orchestration_completion
以達到高效的輪詢。
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))
logger.info(f"Starting new fan out/fan in orchestration with {count} work items")
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
"fan_out_fan_in_orchestrator",
input=work_items
)
logger.info(f"Started orchestration with ID = {instance_id}")
# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=60
)
為了示範 扇出/扇入模式,FanOutFanInPattern
專案調度會建立平行活動工作,並等候所有工作完成。 協調器:
- 接受工作專案清單做為輸入。
- 使用 `` 為每個工作專案建立個別的工作,以便分散處理。
- 平行執行所有工作。
- 等待所有任務使用 `` 完成。
- 藉由使用 `` 符號來匯總所有個別結果。
- 將最終匯總的結果傳回用戶端。
專案包含:
-
DurableTaskSchedulerWorkerExtensions
工作者:定義協調器和活動函式。 -
DurableTaskSchedulerClientExtension
client:設定工作主機,正確處理連接字串。
工人
協調程序會使用扇出/扇入的方式,建立平行的活動任務,並等候所有任務完成。
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalWordCount);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
.build();
// Start the worker
worker.start();
客戶
用戶端專案:
- 使用與工作者相同的連接字串邏輯。
- 建立要平行處理的工作項目清單。
- 使用清單做為輸入來排程協調流程實例。
- 等候協調流程完成,並顯示匯總的結果。
- 使用
waitForInstanceCompletion
以達到高效的輪詢。
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();
// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
"Always remember that you are absolutely unique. Just like everyone else.");
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));
後續步驟
現在您已使用長期工作排程器模擬器在本機執行範例,請嘗試建立排程器和工作中樞資源,並部署至 Azure Container Apps。