Hızlı Başlangıç: Dayanıklı Görev SDK'ları ve Dayanıklı Görev Zamanlayıcı ile uygulama oluşturma

Dayanıklı Görev SDK'ları Dayanıklı Görev Zamanlayıcı için basit bir istemci kitaplığı sağlar. Bu hızlı başlangıçta, birden çok iş öğesini paralel olarak işlemek için fan-out/fan-in uygulama desenini kullanan bir örnek uygulama oluşturacak ve ardından sonuçları toplaacaksınız.

Bu hızlı başlangıcın sonunda, görevleri paralel çalışanlara dağıtan ve sonuçları bir araya getiren, tamamı Dayanıklı Görev Zamanlayıcı öykünücüsü ile yerel olarak çalışan işleyen bir orkestrasyonunuz olacak.

Önemli

Şu anda PowerShell Dayanıklı Görev SDK'sı kullanılamıyor.

Note

Dayanıklı Görev SDK'ları için PowerShell desteği önizleme aşamasındadır. Tam hızlı başlangıç deneyimi için yukarıda farklı bir dil sekmesi seçin (C#, Python, Java veya JavaScript).

  • Yerel geliştirme için Dayanıklı Görev Zamanlayıcı öykünücüsünü ayarlayın ve çalıştırın.
  • Çalışan ve istemci projelerini çalıştırın.
  • Dayanıklı Görev Zamanlayıcı panosu aracılığıyla düzenleme durumunu ve geçmişini gözden geçirin.

Önkoşullar

Başlamadan önce:

Dayanıklı Görev Zamanlayıcı öykünücüsünü ayarlayın

Öykünücü, Docker kapsayıcısında Dayanıklı Görev Zamanlayıcı'yı ve bir görev hub'ını simüle eder, bu da onu yerel geliştirme için ideal kılar. Örnek kod, ortam değişkeni ayarlanmamışsa emülatöre otomatik olarak bağlanır.

  1. Azure-Samples/Durable-Task-Scheduler kök dizininden .NET SDK örnek dizinine gidin.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. Öykünücü için Docker görüntüsünü çekin.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Öykünücüyü çalıştırın. Kapsayıcının hazır olması birkaç saniye sürebilir.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Örnek kod otomatik olarak varsayılan öykünücü ayarlarını kullandığından, ortam değişkenlerini ayarlamanız gerekmez. Bu hızlı başlangıç için varsayılan öykünücü ayarları şunlardır:

  • Uç nokta: http://localhost:8080
  • Görev hub'ı: default
  1. Azure-Samples/Durable-Task-Scheduler kök dizininden Python SDK örnek dizinine gidin.

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. Öykünücü için Docker görüntüsünü çekin.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Öykünücüyü çalıştırın. Kapsayıcının hazır olması birkaç saniye sürebilir.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Örnek kod otomatik olarak varsayılan öykünücü ayarlarını kullandığından, ortam değişkenlerini ayarlamanız gerekmez. Bu hızlı başlangıç için varsayılan öykünücü ayarları şunlardır:

  • Uç nokta: http://localhost:8080
  • Görev hub'ı: default
  1. Azure-Samples/Durable-Task-Scheduler kök dizininden Java SDK örnek dizinine gidin.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. Öykünücü için Docker görüntüsünü çekin.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Öykünücüyü çalıştırın. Kapsayıcının hazır olması birkaç saniye sürebilir.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Örnek kod otomatik olarak varsayılan öykünücü ayarlarını kullandığından, ortam değişkenlerini ayarlamanız gerekmez. Bu hızlı başlangıç için varsayılan öykünücü ayarları şunlardır:

  • Uç nokta: http://localhost:8080
  • Görev hub'ı: default
  1. Azure-Samples/Durable-Task-Scheduler kök dizininden JavaScript SDK örnek dizinine gidin.

    cd samples/durable-task-sdks/javascript/fan-out-fan-in
    
  2. Öykünücü için Docker görüntüsünü çekin.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. Öykünücüyü çalıştırın. Kapsayıcının hazır olması birkaç saniye sürebilir.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

Örnek kod otomatik olarak varsayılan öykünücü ayarlarını kullandığından, ortam değişkenlerini ayarlamanız gerekmez. Bu hızlı başlangıç için varsayılan öykünücü ayarları şunlardır:

  • Uç nokta: http://localhost:8080
  • Görev hub'ı: default

Hızlı başlangıcı çalıştırma

  1. FanOutFanIn dizininden Worker dizinine gidin, çalışanı oluşturup çalıştırın.

    cd Worker
    dotnet build
    dotnet run
    
  2. Ayrı bir terminalde FanOutFanIn dizininden Client dizinine giderek istemciyi derleyin ve çalıştırın.

    cd Client
    dotnet build
    dotnet run
    

Çıkışı anlama

Bu örneği çalıştırdığınızda hem çalışan hem de istemci işlemlerinden çıkış alırsınız. Projeyi çalıştırdığınızda kodda neler olduğunu açın.

Çalışan verimliliği

Çalışan çıktısı şunları gösterir:

  • Düzenleyicinin ve etkinliklerin kaydı
  • Her etkinlik çağrıldığında günlük girdileri kaydedilir.
  • Birden çok iş öğesinin paralel işlenmesi
  • Sonuçların son toplaması

İstemci çıkışı

İstemci çıkışı şunları gösterir:

  • İş öğelerinin listesiyle başlayan orkestrasyon
  • Benzersiz orkestrasyon örneği kimliği
  • Her iş öğesini ve buna karşılık gelen sonucu gösteren son toplu sonuçlar
  • İşlenen öğelerin toplam sayısı

Örnek çıkış

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. Python sanal ortamını etkinleştirme.

    python -m venv venv
    /venv/Scripts/activate
    
  2. Gerekli paketleri yükleyin.

    pip install -r requirements.txt
    
  3. Çalışanı başlatın.

    python worker.py
    

    Beklenen çıkış

    Çalışanın başladığını ve iş öğelerini beklediğini gösteren çıktıyı görebilirsiniz.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. Yeni bir terminalde sanal ortamı etkinleştirin ve istemcisini çalıştırın.

    venv/Scripts/activate
    python client.py
    

    İş öğelerinin sayısını bağımsız değişken olarak sağlayabilirsiniz. Bağımsız değişken sağlanmazsa, örnek varsayılan olarak 10 öğe çalıştırır.

    python client.py [number_of_items]
    

Çıkışı anlama

Bu örneği çalıştırdığınızda hem çalışan hem de istemci işlemlerinden çıkış alırsınız. Projeyi çalıştırdığınızda kodda neler olduğunu açın.

Çalışan verimliliği

Çalışan çıktısı şunları gösterir:

  • Orkestra yöneticisinin ve etkinliklerin kaydı.
  • Her iş öğesi paralel olarak işlenirken, eşzamanlı olarak yürütüldüğünü gösteren durum iletileri.
  • Değişen işlem sürelerinin benzetimini yapmak için her iş öğesi için rastgele gecikmeler (0,5 ile 2 saniye arasında).
  • Sonuçların toplanma işlemini gösteren son ileti.

İstemci çıkışı

İstemci çıkışı şunları gösterir:

  • Belirtilen sayıda iş öğesiyle başlayan düzenleme.
  • Benzersiz orkestrasyon örnek kimliği.
  • Aşağıdakiler dahil son toplanan sonuç:
    • İşlenen öğelerin toplam sayısı
    • Tüm sonuçların toplamı (her öğe sonucu değerinin karesi)
    • Tüm sonuçların ortalaması

Örnek çıkış

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

Dizininden fan-out-fan-in Gradle kullanarak uygulamayı derleyin ve çalıştırın.

./gradlew runFanOutFanInPattern

Tip

hata iletisini zsh: permission denied: ./gradlewalırsanız, uygulamayı çalıştırmadan önce çalıştırmayı chmod +x gradlew deneyin.

Çıkışı anlama

Bu örneği çalıştırdığınızda şunları gösteren bir çıkış alırsınız:

  • Orkestra yöneticisinin ve etkinliklerin kaydı.
  • Her iş öğesi paralel olarak işlenirken, eşzamanlı olarak yürütüldüğünü gösteren durum iletileri.
  • Değişen işlem sürelerinin benzetimini yapmak için her iş öğesi için rastgele gecikmeler (0,5 ile 2 saniye arasında).
  • Sonuçların toplanma işlemini gösteren son ileti.

Projeyi çalıştırdığınızda kodda neler olduğunu açın.

Örnek çıkış

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
  1. Gerekli npm paketlerini yükleyin.

    npm install
    
  2. Çalışanı başlatın.

    npm run worker
    

    Beklenen çıkış

    Çalışanın başladığını ve iş öğelerini beklediğini gösteren çıktıyı görebilirsiniz.

    Using local emulator with no authentication
    Starting worker...
    Worker is ready and listening for tasks.
    
  3. Yeni bir terminalde fan-out-fan-in , dizininden istemcisini çalıştırın.

    npm run client
    

    İş öğelerinin sayısını bağımsız değişken olarak sağlayabilirsiniz. Bağımsız değişken sağlanmazsa, örnek varsayılan olarak 10 öğe çalıştırır.

    npm run client -- 15
    

Çıkışı anlama

Bu örneği çalıştırdığınızda hem çalışan hem de istemci işlemlerinden çıkış alırsınız. Projeyi çalıştırdığınızda kodda neler olduğunu açın.

Çalışan verimliliği

Çalışan çıktısı şunları gösterir:

  • Her iş öğesi paralel olarak işlenirken, eşzamanlı olarak yürütüldüğünü gösteren durum iletileri.
  • Değişen işlem sürelerinin benzetimini yapmak için her iş öğesi için rastgele gecikmeler (0,5 ile 2 saniye arasında).
  • Sonuçların toplanma işlemini gösteren son ileti.

İstemci çıkışı

İstemci çıkışı şunları gösterir:

  • Belirtilen sayıda iş öğesiyle başlayan düzenleme.
  • Benzersiz orkestrasyon örnek kimliği.
  • Aşağıdakiler dahil son toplanan sonuç:
    • İşlenen öğelerin toplam sayısı
    • Tüm sonuçların toplamı (her öğe sonucu değerinin karesi)
    • Tüm sonuçların ortalaması

Örnek çıkış

Using local emulator with no authentication
Starting fan out/fan in orchestration with 10 items
Started orchestration with ID: abc123-def456-ghi789
Waiting for orchestration to complete...
Processing work item 1 (delay 823ms)
Processing work item 2 (delay 1205ms)
Processing work item 3 (delay 512ms)
Processing work item 4 (delay 1890ms)
Processing work item 5 (delay 645ms)
Processing work item 6 (delay 1102ms)
Processing work item 7 (delay 933ms)
Processing work item 8 (delay 1567ms)
Processing work item 9 (delay 701ms)
Processing work item 10 (delay 1344ms)
Aggregating 10 results...
Orchestration completed with status: COMPLETED
Result: {"totalItems":10,"sum":385,"average":38.5,"results":[...]}

Projeyi yerel olarak çalıştırdığınıza göre, Azure Container Apps'te barındırılan Azure'a dağıtım yapmayı öğrenin.

Düzenleme durumunu ve geçmişini görüntüleme

Düzenleme durumunu ve geçmişini Dayanıklı Görev Zamanlayıcı panosu aracılığıyla görüntüleyebilirsiniz. Pano varsayılan olarak 8082 numaralı bağlantı noktasında çalışır.

  1. Web tarayıcınızda http://localhost:8082 adresine gidin.
  2. Varsayılan görev hub'ına tıklayın. Oluşturduğunuz düzenleme örneği listede yer alır.
  3. Yürütme ayrıntılarını görüntülemek için orkestrasyon örneği kimliğine tıklayın. Ayrıntılar şunları içerir:
    • Birden çok etkinlik görevinin paralel yürütülmesi
    • Birleşme adımı
    • Her adımda giriş ve çıkış
    • Her adım için geçen süre

 .NET örneği için orkestrasyon örneğinin ayrıntılarını gösteren ekran görüntüsü.

Python örneği için düzenleme örneğinin ayrıntılarını gösteren ekran görüntüsü.

Java örnek çalışması için düzenleme örneğinin ayrıntılarını gösteren ekran görüntüsü.

JavaScript örneği için düzenleme örneğinin ayrıntılarını gösteren ekran görüntüsü.

Kod yapısını anlama

İşçi projesi

Çalışan proje düzenlemesi, fan-out/fan-in desenini göstermek için paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler. Düzenleyici:

  1. Girdi olarak iş öğelerinin listesini alır.
  2. Her iş öğesi için ayrı bir görev oluşturarak ProcessWorkItemActivity kullanarak görevleri dağıtır.
  3. Tüm görevleri paralel olarak yürütür.
  4. Tüm görevlerin tamamlanmasını Task.WhenAll kullanarak bekler.
  5. Sonuçların tamamını AggregateResultsActivity kullanarak tek tek toplayarak hayranları dahil et.
  6. toplanan son sonucu istemciye döndürür.

Çalışan projesi aşağıdakileri içerir:

  • ParallelProcessingOrchestration.cs: Tek bir dosyada düzenleyici ve etkinlik işlevlerini tanımlar.
  • Program.cs: Çalışma sunucusunu uygun bağlantı dizesi yönetimi ile ayarlar.

ParallelProcessingOrchestration.cs

Düzenleme, fan-out/fan-in kullanarak paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

Her etkinlik özniteliğiyle [DurableTask] dekore edilmiş ayrı bir sınıf olarak uygulanır.

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

Çalışan, doğru yaşam döngüsü yönetimi için kullanır Microsoft.Extensions.Hosting .

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

Müşteri projesi

Müşteri Projesi:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır.
  • Paralel olarak işlenecek iş öğelerinin listesini oluşturur.
  • Listeyi girdi olarak kullanarak bir orkestrasyon örneği zamanlanır.
  • Orkestrasyonun tamamlanmasını bekler ve birleştirilmiş sonuçları görüntüler.
  • Verimli yoklama için WaitForInstanceCompletionAsync kullanır.
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

Çalışan proje düzenlemesi, fan-out/fan-in desenini göstermek için paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler. Düzenleyici:

  1. Girdi olarak iş öğelerinin listesini alır.
  2. Her iş öğesi için (her biri için process_work_item çağırarak) paralel görevler oluşturarak "dağılır."
  3. Tüm görevlerin tamamlanmasını task.when_all kullanarak bekler.
  4. Ardından aggregate_results aktivitesi ile sonuçları birleştirerek "fan in" yapar.
  5. Toplanan son sonuç istemciye döndürülür.

Düzenleme, fan-out/fan-in kullanarak paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

Müşteri Projesi:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır.
  • Paralel olarak işlenecek iş öğelerinin listesini oluşturur.
  • Listeyi girdi olarak kullanarak bir orkestrasyon örneği zamanlanır.
  • Orkestrasyonun tamamlanmasını bekler ve birleştirilmiş sonuçları görüntüler.
  • Verimli yoklama için wait_for_orchestration_completion kullanır.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

Proje düzenlemesi, fan-out/fan-in deseniniFanOutFanInPattern göstermek için paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler. Düzenleyici:

  1. Girdi olarak iş öğelerinin listesini alır.
  2. Her iş öğesi için ayrı bir görev oluşturarak callActivity kullanarak görevleri dağıtır.
  3. Tüm görevleri paralel olarak yürütür.
  4. Tüm görevlerin tamamlanmasını allOf kullanarak bekler.
  5. Sonuçların tamamını stream().mapToInt().sum() kullanarak tek tek toplayarak hayranları dahil et.
  6. toplanan son sonucu istemciye döndürür.

Proje aşağıdakileri içerir:

  • DurableTaskSchedulerWorkerExtensions worker: Düzenleyici ve etkinlik işlevlerini tanımlar.
  • DurableTaskSchedulerClientExtension client: Çalışan ana bilgisayarı doğru bağlantı dizesi işleme ile ayarlar.

Çalışan

Düzenleme, fan-out/fan-in kullanarak paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

Müşteri

Müşteri Projesi:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır.
  • Paralel olarak işlenecek iş öğelerinin listesini oluşturur.
  • Listeyi girdi olarak kullanarak bir orkestrasyon örneği zamanlanır.
  • Orkestrasyonun tamamlanmasını bekler ve birleştirilmiş sonuçları görüntüler.
  • Verimli yoklama için waitForInstanceCompletion kullanır.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

Fan-out/fan-in desenini göstermek için, örnek orkestrasyon paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler. Düzenleyici:

  1. Girdi olarak iş öğelerinin listesini alır.
  2. Her iş öğesi için paralel görevler oluşturarak (her biri için processWorkItem çağrısı yaparak) görevleri dağıtır.
  3. Tüm görevlerin tamamlanmasını whenAll kullanarak bekler.
  4. Sonuçları aggregateResults etkinliğiyle birleştirerek toplayan hayranlar.
  5. toplanan son sonucu istemciye döndürür.

Proje aşağıdakileri içerir:

  • worker.mjs: Düzenleyici ve etkinlik işlevlerini tanımlar.
  • client.mjs: Orkestrasyonları zamanlar ve tamamlanmasını bekler.

worker.mjs

Düzenleme, fan-out/fan-in kullanarak paralel etkinlik görevleri oluşturur ve tümünün tamamlanmasını bekler.

import { whenAll } from "@microsoft/durabletask-js";
import { createAzureManagedWorkerBuilder } from "@microsoft/durabletask-js-azuremanaged";

// Orchestrator function using generator syntax
const fanOutFanInOrchestrator = async function* (ctx, workItems) {
  const items = Array.isArray(workItems) ? workItems : [];

  // Fan out: schedule parallel activity calls
  const tasks = items.map((item) => ctx.callActivity(processWorkItem, item));

  // Wait for all tasks using whenAll
  const processedResults = yield whenAll(tasks);

  // Fan in: aggregate results
  const finalResult = yield ctx.callActivity(aggregateResults, processedResults);

  return finalResult;
};

// Activity that processes a single work item
const processWorkItem = async (_ctx, workItem) => {
  const normalizedItem = Number(workItem);
  const delayMs = 500 + Math.floor(Math.random() * 1500);
  console.log(`Processing work item ${normalizedItem} (delay ${delayMs}ms)`);

  await new Promise((resolve) => setTimeout(resolve, delayMs));

  return {
    item: normalizedItem,
    result: normalizedItem * normalizedItem,
  };
};

// Activity that aggregates results
const aggregateResults = async (_ctx, results) => {
  const sum = results.reduce((acc, curr) => acc + curr.result, 0);

  return {
    totalItems: results.length,
    sum,
    average: results.length ? sum / results.length : 0,
    results,
  };
};

// Build and start worker
const connectionString = `Endpoint=http://localhost:8080;Authentication=None;TaskHub=default`;
const worker = createAzureManagedWorkerBuilder(connectionString)
  .addOrchestrator(fanOutFanInOrchestrator)
  .addActivity(processWorkItem)
  .addActivity(aggregateResults)
  .build();

await worker.start();

client.mjs

Müşteri Projesi:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır.
  • Paralel olarak işlenecek iş öğelerinin listesini oluşturur.
  • Listeyi girdi olarak kullanarak bir orkestrasyon örneği zamanlanır.
  • Orkestrasyonun tamamlanmasını bekler ve birleştirilmiş sonuçları görüntüler.
  • Verimli yoklama için waitForOrchestrationCompletion kullanır.
import { OrchestrationStatus } from "@microsoft/durabletask-js";
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";

// Create work items array
const count = 10;
const workItems = Array.from({ length: count }, (_unused, index) => index + 1);

const connectionString = `Endpoint=http://localhost:8080;Authentication=None;TaskHub=default`;
const client = createAzureManagedClient(connectionString);

// Schedule orchestration
const instanceId = await client.scheduleNewOrchestration(
  "fanOutFanInOrchestrator",
  workItems
);

console.log(`Started orchestration with ID: ${instanceId}`);

// Wait for completion
const state = await client.waitForOrchestrationCompletion(instanceId, true, 120);

if (state.runtimeStatus === OrchestrationStatus.COMPLETED) {
  console.log(`Result: ${state.serializedOutput}`);
}

await client.stop();

Sonraki Adımlar

Örneği Dayanıklı Görev Zamanlayıcı öykünücüsü kullanarak yerel olarak çalıştırdığınıza göre, bir zamanlama sistemi ve görev merkezi kaynağı oluşturmayı ve Azure Container Apps üzerine dağıtmayı deneyin.