共用方式為


快速入門:使用耐用任務 SDK 和耐用任務排程器(預覽)建立應用程式

長期工作 SDK 工具包為 Durable Task 排程器提供輕量型用戶端程式庫。 在本快速入門中,您將瞭解如何建立使用展開式/收合式應用程式模式進行平行處理的流程。

這很重要

目前,長期工作 SDK 不適用於 JavaScript 和 PowerShell。

這很重要

目前,長期工作 SDK 不適用於 JavaScript 和 PowerShell。

  • 設定並執行耐用任務排程器模擬器以進行本機開發。
  • 執行工作者和客戶端專案。
  • 透過長期工作排程器儀錶板檢閱協調流程狀態和歷程記錄。

先決條件

在開始之前:

設定長期工作排程器模擬器

應用程式程式代碼會尋找已部署的排程器和工作中樞資源。 如果找不到任何項目,程式代碼就會回復至模擬器。 模擬器會模擬 Docker 容器中的排程器和工作中樞,使其非常適合本快速入門中所需的本機開發。

  1. Azure-Samples/Durable-Task-Scheduler從根目錄,流覽至 .NET SDK 範例目錄。

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. 提取模擬器的 Docker 映像。

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. 執行模擬器。 容器可能需要幾秒鐘的時間才能就緒。

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

由於範例程式代碼會自動使用預設模擬器設定,因此您不需要設定任何環境變數。 本快速入門的預設模擬器設定如下:

  • 端點:http://localhost:8080
  • 工作中樞: default
  1. Azure-Samples/Durable-Task-Scheduler從根目錄,流覽至 Python SDK 範例目錄。

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. 提取模擬器的 Docker 映像。

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. 執行模擬器。 容器可能需要幾秒鐘的時間才能就緒。

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

由於範例程式代碼會自動使用預設模擬器設定,因此您不需要設定任何環境變數。 本快速入門的預設模擬器設定如下:

  • 端點:http://localhost:8080
  • 工作中樞: default
  1. Azure-Samples/Durable-Task-Scheduler從根目錄,流覽至 Java SDK 範例目錄。

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. 提取模擬器的 Docker 映像。

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. 執行模擬器。 容器可能需要幾秒鐘的時間才能就緒。

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

由於範例程式代碼會自動使用預設模擬器設定,因此您不需要設定任何環境變數。 本快速入門的預設模擬器設定如下:

  • 端點:http://localhost:8080
  • 工作中樞: default

執行快速入門

  1. FanOutFanIn目錄中,導航至Worker目錄以建置並執行工作程序。

    cd Worker
    dotnet build
    dotnet run
    
  2. 在另一個終端機中,從 FanOutFanIn 目錄切換到 Client 目錄以建置並執行用戶端。

    cd Client
    dotnet build
    dotnet run
    

了解輸出

當您執行此範例時,您會收到工作程式和客戶端程式的輸出。 分析您執行專案時程式碼中發生的事情。

工作者輸出

工作元件輸出顯示:

  • 協調器和活動的註冊
  • 呼叫每個活動時記錄條目
  • 多個工作專案的平行處理
  • 結果的最終匯總

客戶端輸出

用戶端輸出會顯示:

  • 從工作專案清單開始的協調流程
  • 唯一的編排實例標識碼
  • 最終匯總的結果,顯示每個工作專案及其對應的結果
  • 已處理項目的總計數

範例輸出

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. 啟動 Python 虛擬環境。

    python -m venv venv
    /venv/Scripts/activate
    
  2. 安裝必要的套件。

    pip install -r requirements.txt
    
  3. 啟動工作程序。

    python worker.py
    

    預期的輸出

    您可以看到輸出,顯示工作執行緒已啟動並正在等候工作項目。

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. 在新終端機中,啟動虛擬環境並執行用戶端。

    venv/Scripts/activate
    python client.py
    

    您可以提供工作項目的數量作為參數。 如果未提供自變數,則範例預設會執行10個專案。

    python client.py [number_of_items]
    

了解輸出

當您執行此範例時,您會收到工作程式和客戶端程式的輸出。 分析您執行專案時程式碼中發生的事情。

工作者輸出

工作元件輸出顯示:

  • 協調器和活動的註冊。
  • 平行處理每個工作項目時的狀態消息,顯示它們正在同時執行。
  • 每個工作項目的隨機延遲(介於 0.5 到 2 秒之間),以模擬不同的處理時間。
  • 顯示結果匯總的最終訊息。

客戶端輸出

用戶端輸出會顯示:

  • 從指定的工作項目數目開始的協調流程。
  • 唯一的協調流程實例標識碼。
  • 最終匯總的結果,其中包括:
    • 已處理的項目總數
    • 所有結果的總和(每個項目結果都是其值的平方)
    • 所有結果的平均值

範例輸出

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

fan-out-fan-in從目錄中,使用 Gradle 建置並執行應用程式。

./gradlew runFanOutFanInPattern

小提示

如果您收到錯誤訊息 zsh: permission denied: ./gradlew,請嘗試在執行應用程式之前執行 chmod +x gradlew

了解輸出

當您執行此範例時,您會收到顯示下列項目的輸出:

  • 協調器和活動的註冊。
  • 平行處理每個工作項目時的狀態消息,顯示它們正在同時執行。
  • 每個工作項目的隨機延遲(介於 0.5 到 2 秒之間),以模擬不同的處理時間。
  • 顯示結果匯總的最終訊息。

分析您執行專案時程式碼中發生的事情。

範例輸出

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60

既然您已在本機執行專案,現在可以學習如何將專案部署到以 Azure Container Apps 裝載的 Azure。

檢視協調流程狀態和歷程記錄

您可以透過 長期工作排程器儀錶板來檢視協調流程狀態和歷程記錄。 根據預設,儀錶板會在埠 8082 上執行。

  1. 在網頁瀏覽器中瀏覽至 http://localhost:8082 。
  2. 按一下 預設 工作中樞。 您所建立的協調流程實例位於清單中。
  3. 點擊協調實例 ID 以檢視執行細節,包括:
    • 多個活動工作的平行執行
    • 扇入聚合步驟
    • 每個步驟的輸入和輸出
    • 每個步驟所花費的時間

顯示 .NET 範例協同作業實例詳細資訊的螢幕快照。

顯示 Python 範例編排實例詳細資訊的螢幕快照。

顯示 Java 範例協調流程實例詳細數據的螢幕快照。

瞭解程式代碼結構

工人專案

為了示範 扇出/扇入模式,工作專案流程編排會建立平行活動任務,並等待所有完成。 協調器:

  1. 接受工作專案清單做為輸入。
  2. 使用 ProcessWorkItemActivity 為每個工作項目創建單獨的任務,以分配工作。
  3. 平行執行所有工作。
  4. 使用 Task.WhenAll 等候所有工作完成。
  5. 使用 AggregateResultsActivity 來匯總所有個別結果中的粉絲。
  6. 將最終匯總的結果傳回用戶端。

工作專案包含:

  • ParallelProcessingOrchestration.cs:在單一檔案中定義協調器和活動函式。
  • Program.cs:使用適當的連接字串處理設定背景工作主機。

ParallelProcessingOrchestration.cs

協調程序會使用扇出/扇入的方式,建立平行的活動任務,並等候所有任務完成。

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

每個活動都會實作為以 [DurableTask] 屬性裝飾的個別類別。

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

工作者使用 Microsoft.Extensions.Hosting 進行適當的生命週期管理。

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

用戶端專案

用戶端專案:

  • 使用與工作者相同的連接字串邏輯。
  • 建立要平行處理的工作項目清單。
  • 使用清單做為輸入來排程協調流程實例。
  • 等候協調流程完成,並顯示匯總的結果。
  • 使用 WaitForInstanceCompletionAsync 以達到高效的輪詢。
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

為了示範 扇出/扇入模式,工作專案流程編排會建立平行活動任務,並等待所有完成。 協調器:

  1. 接收一份工作項目清單作為輸入。
  2. 它透過為每個工作項目建立平行工作來「分散」,(針對每個工作項目呼叫 process_work_item )。
  3. 它會使用 task.when_all 等候所有工作完成。
  4. 然後,它通過匯總結果來整合這些與 aggregate_results 活動相關的結果。
  5. 最終匯總的結果會傳回至用戶端。

協調程序會使用扇出/扇入的方式,建立平行的活動任務,並等候所有任務完成。

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

用戶端專案:

  • 使用與工作者相同的連接字串邏輯。
  • 建立要平行處理的工作項目清單。
  • 使用清單做為輸入來排程協調流程實例。
  • 等候協調流程完成,並顯示匯總的結果。
  • 使用 wait_for_orchestration_completion 以達到高效的輪詢。
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

為了示範 扇出/扇入模式FanOutFanInPattern 專案調度會建立平行活動工作,並等候所有工作完成。 協調器:

  1. 接受工作專案清單做為輸入。
  2. 使用 `` 為每個工作專案建立個別的工作,以便分散處理。
  3. 平行執行所有工作。
  4. 等待所有任務使用 `` 完成。
  5. 藉由使用 `` 符號來匯總所有個別結果。
  6. 將最終匯總的結果傳回用戶端。

專案包含:

  • DurableTaskSchedulerWorkerExtensions 工作者:定義協調器和活動函式。
  • DurableTaskSchedulerClientExtension client:設定工作主機,正確處理連接字串。

工人

協調程序會使用扇出/扇入的方式,建立平行的活動任務,並等候所有任務完成。

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

客戶

用戶端專案:

  • 使用與工作者相同的連接字串邏輯。
  • 建立要平行處理的工作項目清單。
  • 使用清單做為輸入來排程協調流程實例。
  • 等候協調流程完成,並顯示匯總的結果。
  • 使用 waitForInstanceCompletion 以達到高效的輪詢。
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

後續步驟

現在您已使用長期工作排程器模擬器在本機執行範例,請嘗試建立排程器和工作中樞資源,並部署至 Azure Container Apps。