مشاركة عبر


البدء السريع: أنشئ تطبيقا باستخدام مجموعات تطوير المهام الدائمة وجدولة المهام الدائمة

توفر Durable Task SDKs مكتبة عميل خفيفة الوزن ل Durable Task Scheduler. في هذا التشغيل السريع، ستتعلم كيفية إنشاء تنسيقات تستخدم نمط تطبيق fan-out/fan-in لإجراء معالجة متوازية.

مهم

حاليا، حزمة تطوير المهام الدائمة PowerShell غير متوفرة.

  • إعداد وتشغيل محاكي جدولة المهام الدائمة للتطوير المحلي.
  • تشغيل مشاريع العامل والعميل.
  • راجع حالة التزامن والمحفوظات عبر لوحة معلومات Durable Task Scheduler.

المتطلبات المسبقه

قبل البدء:

  • تأكد من أن لديك .NET 8 SDK أو أحدث.
  • تثبيت Docker لتشغيل المحاكي.
  • قم باستنساخ ><جدولة المهام المتطورة GitHub المستودع لاستخدام عينة البدء السريع.
  • تأكد من أن لديك Python 3.9+ أو أبعد.
  • تثبيت Docker لتشغيل المحاكي.
  • قم باستنساخ ><جدولة المهام المتطورة GitHub المستودع لاستخدام عينة البدء السريع.
  • تأكد من أن لديك Java 8 أو 11.
  • تثبيت Docker لتشغيل المحاكي.
  • قم باستنساخ ><جدولة المهام المتطورة GitHub المستودع لاستخدام عينة البدء السريع.
  • تأكد من أن لديك Node.js 22 أو أكثر.
  • تثبيت Docker لتشغيل المحاكي.
  • قم باستنساخ ><جدولة المهام المتطورة GitHub المستودع لاستخدام عينة البدء السريع.

إعداد محاكي جدولة المهام الدائمة

تبحث التعليمات البرمجية للتطبيق عن مجدول تم نشره ومورد مركز المهام. إذا لم يتم العثور على أي شيء، تعود التعليمات البرمجية إلى المحاكي. يحاكي المحاكي المجدول ومركز المهام في حاوية Docker، ما يجعله مثاليا للتطوير المحلي المطلوب في هذا التشغيل السريع.

  1. من مجلد الجذر Azure-Samples/Durable-Task-Scheduler، انتقل إلى مجلد عينات SDK .NET.

    cd samples/durable-task-sdks/dotnet/FanOutFanIn
    
  2. اسحب صورة Docker للمحاكي.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. تشغيل المحاكي. قد تستغرق الحاوية بضع ثوان لتكون جاهزة.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

نظرا لأن التعليمات البرمجية للمثال تستخدم تلقائيا إعدادات المحاكي الافتراضية، فلن تحتاج إلى تعيين أي متغيرات بيئة. إعدادات المحاكي الافتراضية لهذا التشغيل السريع هي:

  • نقطه النهايه: http://localhost:8080
  • مركز المهام: default
  1. من مجلد الجذر Azure-Samples/Durable-Task-Scheduler، انتقل إلى دليل عينات SDK الخاص ب Python.

    cd samples/durable-task-sdks/python/fan-out-fan-in
    
  2. اسحب صورة Docker للمحاكي.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. تشغيل المحاكي. قد تستغرق الحاوية بضع ثوان لتكون جاهزة.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

نظرا لأن التعليمات البرمجية للمثال تستخدم تلقائيا إعدادات المحاكي الافتراضية، فلن تحتاج إلى تعيين أي متغيرات بيئة. إعدادات المحاكي الافتراضية لهذا التشغيل السريع هي:

  • نقطه النهايه: http://localhost:8080
  • مركز المهام: default
  1. من مجلد الجذر Azure-Samples/Durable-Task-Scheduler، انتقل إلى دليل عينات SDK الخاص ب Java.

    cd samples/durable-task-sdks/java/fan-out-fan-in
    
  2. اسحب صورة Docker للمحاكي.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. تشغيل المحاكي. قد تستغرق الحاوية بضع ثوان لتكون جاهزة.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

نظرا لأن التعليمات البرمجية للمثال تستخدم تلقائيا إعدادات المحاكي الافتراضية، فلن تحتاج إلى تعيين أي متغيرات بيئة. إعدادات المحاكي الافتراضية لهذا التشغيل السريع هي:

  • نقطه النهايه: http://localhost:8080
  • مركز المهام: default
  1. من مجلد الجذر Azure-Samples/Durable-Task-Scheduler، انتقل إلى دليل نماذج SDK الخاص بجافاسكريبت.

    cd samples/durable-task-sdks/javascript/fan-out-fan-in
    
  2. اسحب صورة Docker للمحاكي.

    docker pull mcr.microsoft.com/dts/dts-emulator:latest
    
  3. تشغيل المحاكي. قد تستغرق الحاوية بضع ثوان لتكون جاهزة.

    docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
    

نظرا لأن التعليمات البرمجية للمثال تستخدم تلقائيا إعدادات المحاكي الافتراضية، فلن تحتاج إلى تعيين أي متغيرات بيئة. إعدادات المحاكي الافتراضية لهذا التشغيل السريع هي:

  • نقطه النهايه: http://localhost:8080
  • مركز المهام: default

تشغيل التشغيل السريع

  1. FanOutFanIn من الدليل، انتقل إلى Worker الدليل لإنشاء العامل وتشغيله.

    cd Worker
    dotnet build
    dotnet run
    
  2. في محطة طرفية منفصلة FanOutFanIn ، من الدليل، انتقل إلى Client الدليل لإنشاء العميل وتشغيله.

    cd Client
    dotnet build
    dotnet run
    

فهم الإخراج

عند تشغيل هذه العينة، تتلقى الإخراج من كل من عمليات العامل والعميل. فك حزمة ما حدث في التعليمات البرمجية عند تشغيل المشروع.

إخراج العامل

يظهر إخراج العامل:

  • تسجيل المنسق والأنشطة
  • تسجيل الإدخالات عند استدعاء كل نشاط
  • المعالجة المتوازية لعناصر عمل متعددة
  • التجميع النهائي للنتائج

إخراج العميل

يظهر إخراج العميل ما يلي:

  • التزامن يبدأ بقائمة عناصر العمل
  • معرف مثيل التنسيق الفريد
  • النتائج المجمعة النهائية، تظهر كل عنصر عمل والنتيجة المقابلة له
  • العدد الإجمالي للعناصر المعالجة

مثال على الإخراج

Starting Fan-Out Fan-In Pattern - Parallel Processing Client
Using local emulator with no authentication
Starting parallel processing orchestration with 5 work items
Work items: ["Task1","Task2","Task3","LongerTask4","VeryLongTask5"]
Started orchestration with ID: 7f8e9a6b-1c2d-3e4f-5a6b-7c8d9e0f1a2b
Waiting for orchestration to complete...
Orchestration completed with status: Completed
Processing results:
Work item: Task1, Result: 5
Work item: Task2, Result: 5
Work item: Task3, Result: 5
Work item: LongerTask4, Result: 11
Work item: VeryLongTask5, Result: 13
Total items processed: 5
  1. تفعيل بيئة افتراضية بلغة Python.

    python -m venv venv
    /venv/Scripts/activate
    
  2. تثبيت الحزم المطلوبة.

    pip install -r requirements.txt
    
  3. بدء تشغيل العامل.

    python worker.py
    

    الإخراج المتوقع

    يمكنك مشاهدة الإخراج الذي يشير إلى أن العامل بدأ وينتظر عناصر العمل.

    Starting Fan Out/Fan In pattern worker...
    Using taskhub: default
    Using endpoint: http://localhost:8080
    Starting gRPC worker that connects to http://localhost:8080
    Successfully connected to http://localhost:8080. Waiting for work items...
    
  4. في محطة طرفية جديدة، قم بتنشيط البيئة الظاهرية وتشغيل العميل.

    venv/Scripts/activate
    python client.py
    

    يمكنك توفير عدد عناصر العمل كوسيطة. إذا لم يتم توفير وسيطة، يقوم المثال بتشغيل 10 عناصر بشكل افتراضي.

    python client.py [number_of_items]
    

فهم الإخراج

عند تشغيل هذه العينة، تتلقى الإخراج من كل من عمليات العامل والعميل. فك حزمة ما حدث في التعليمات البرمجية عند تشغيل المشروع.

إخراج العامل

يظهر إخراج العامل:

  • تسجيل المنسق والأنشطة.
  • رسائل الحالة عند معالجة كل عنصر عمل بالتوازي، تظهر أنها تنفذ بشكل متزامن.
  • تأخيرات عشوائية لكل عنصر عمل (بين 0.5 و2 ثانية) لمحاكاة أوقات المعالجة المختلفة.
  • رسالة نهائية تظهر تجميع النتائج.

إخراج العميل

يظهر إخراج العميل ما يلي:

  • التزامن بدءا من العدد المحدد لعناصر العمل.
  • معرف مثيل التنسيق الفريد.
  • النتيجة المجمعة النهائية، والتي تتضمن:
    • العدد الإجمالي للعناصر التي تمت معالجتها
    • مجموع جميع النتائج (كل نتيجة عنصر هي مربع قيمته)
    • متوسط جميع النتائج

مثال على الإخراج

Starting fan out/fan in orchestration with 10 items
Waiting for 10 parallel tasks to complete
Orchestrator yielded with 10 task(s) and 0 event(s) outstanding.
Processing work item: 1
Processing work item: 2
Processing work item: 10
Processing work item: 9
Processing work item: 8
Processing work item: 7
Processing work item: 6
Processing work item: 5
Processing work item: 4
Processing work item: 3
Orchestrator yielded with 9 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 8 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 7 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 6 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 5 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 4 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 3 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 2 task(s) and 0 event(s) outstanding.
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
All parallel tasks completed, aggregating results
Orchestrator yielded with 1 task(s) and 0 event(s) outstanding.
Aggregating results from 10 items
Orchestration completed with status: COMPLETED

من الدليل، قم fan-out-fan-in بإنشاء وتشغيل التطبيق باستخدام Gradle.

./gradlew runFanOutFanInPattern

نصيحة

إذا تلقيت رسالة zsh: permission denied: ./gradlewالخطأ ، فحاول التشغيل chmod +x gradlew قبل تشغيل التطبيق.

فهم الإخراج

عند تشغيل هذا النموذج، تتلقى الإخراج الذي يظهر:

  • تسجيل المنسق والأنشطة.
  • رسائل الحالة عند معالجة كل عنصر عمل بالتوازي، تظهر أنها تنفذ بشكل متزامن.
  • تأخيرات عشوائية لكل عنصر عمل (بين 0.5 و2 ثانية) لمحاكاة أوقات المعالجة المختلفة.
  • رسالة نهائية تظهر تجميع النتائج.

فك حزمة ما حدث في التعليمات البرمجية عند تشغيل المشروع.

مثال على الإخراج

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :runFanOutFanInPattern
Durable Task worker is connecting to sidecar at localhost:8080.
Started new orchestration instance
Orchestration completed: [Name: 'FanOutFanIn_WordCount', ID: '<id-number>', RuntimeStatus: COMPLETED, CreatedAt: 2025-04-25T15:24:47.170Z, LastUpdatedAt: 2025-04-25T15:24:47.287Z, Input: '["Hello, world!","The quick brown fox jumps over t...', Output: '60']
Output: 60
  1. قم بتثبيت حزم npm المطلوبة.

    npm install
    
  2. بدء تشغيل العامل.

    npm run worker
    

    الإخراج المتوقع

    يمكنك مشاهدة الإخراج الذي يشير إلى أن العامل بدأ وينتظر عناصر العمل.

    Using local emulator with no authentication
    Starting worker...
    Worker is ready and listening for tasks.
    
  3. في محطة جديدة، من fan-out-fan-in الدليل، شغل العميل.

    npm run client
    

    يمكنك توفير عدد عناصر العمل كوسيطة. إذا لم يتم توفير وسيطة، يقوم المثال بتشغيل 10 عناصر بشكل افتراضي.

    npm run client -- 15
    

فهم الإخراج

عند تشغيل هذه العينة، تتلقى الإخراج من كل من عمليات العامل والعميل. فك حزمة ما حدث في التعليمات البرمجية عند تشغيل المشروع.

إخراج العامل

يظهر إخراج العامل:

  • رسائل الحالة عند معالجة كل عنصر عمل بالتوازي، تظهر أنها تنفذ بشكل متزامن.
  • تأخيرات عشوائية لكل عنصر عمل (بين 0.5 و2 ثانية) لمحاكاة أوقات المعالجة المختلفة.
  • رسالة نهائية تظهر تجميع النتائج.

إخراج العميل

يظهر إخراج العميل ما يلي:

  • التزامن بدءا من العدد المحدد لعناصر العمل.
  • معرف مثيل التنسيق الفريد.
  • النتيجة المجمعة النهائية، والتي تتضمن:
    • العدد الإجمالي للعناصر التي تمت معالجتها
    • مجموع جميع النتائج (كل نتيجة عنصر هي مربع قيمته)
    • متوسط جميع النتائج

مثال على الإخراج

Using local emulator with no authentication
Starting fan out/fan in orchestration with 10 items
Started orchestration with ID: abc123-def456-ghi789
Waiting for orchestration to complete...
Processing work item 1 (delay 823ms)
Processing work item 2 (delay 1205ms)
Processing work item 3 (delay 512ms)
Processing work item 4 (delay 1890ms)
Processing work item 5 (delay 645ms)
Processing work item 6 (delay 1102ms)
Processing work item 7 (delay 933ms)
Processing work item 8 (delay 1567ms)
Processing work item 9 (delay 701ms)
Processing work item 10 (delay 1344ms)
Aggregating 10 results...
Orchestration completed with status: COMPLETED
Result: {"totalItems":10,"sum":385,"average":38.5,"results":[...]}

الآن بعد أن أدرت المشروع محليا، يمكنك الآن تعلم كيفية deploy على Azure مستضاف في Azure Container Apps.

عرض حالة التزامن والمحفوظات

يمكنك عرض حالة التزامن والمحفوظات عبر لوحة معلومات جدولة المهام الدائمة. بشكل افتراضي، يتم تشغيل لوحة المعلومات على المنفذ 8082.

  1. انتقل إلى http://localhost:8082 في مستعرض الويب الخاص بك.
  2. انقر فوق مركز المهام الافتراضي . مثيل التنسيق الذي قمت بإنشائه موجود في القائمة.
  3. انقر فوق معرف مثيل التزامن لعرض تفاصيل التنفيذ، والتي تتضمن:
    • التنفيذ المتوازي لمهام نشاط متعددة
    • خطوة تجميع المروحة
    • الإدخال والإخراج في كل خطوة
    • الوقت المستغرق لكل خطوة

لقطة شاشة تظهر تفاصيل حالة التوزيع الموسيقي للعينة .NET.

لقطة شاشة تظهر تفاصيل حالة التوزيع الموسيقي لعينة Python.

لقطة شاشة تظهر تفاصيل حالة التوزيع الموسيقي لعينة Java.

لقطة شاشة تظهر تفاصيل حالة التوزيع الموسيقي لعينة جافاسكريبت.

فهم بنية التعليمات البرمجية

مشروع العامل

لإظهار نمط توزيع المهام/توزيع المروحة، ينشئ تنسيق مشروع العامل مهام نشاط متوازية وينتظر اكتمال الجميع. المنسق:

  1. يأخذ قائمة بعناصر العمل كمدخل.
  2. مرر المشجعين عن طريق إنشاء مهمة منفصلة لكل عنصر عمل باستخدام ProcessWorkItemActivity.
  3. تنفيذ جميع المهام بالتوازي.
  4. ينتظر حتى تكتمل جميع المهام باستخدام Task.WhenAll.
  5. المشجعين في عن طريق تجميع جميع النتائج الفردية باستخدام AggregateResultsActivity.
  6. إرجاع النتيجة المجمعة النهائية إلى العميل.

يحتوي مشروع العامل على:

  • ParallelProcessingOrchestration.cs: يحدد دالات المنسق والنشاط في ملف واحد.
  • Program.cs: يجهز مضيف العامل التعامل المناسب مع connection string.

ParallelProcessingOrchestration.cs

باستخدام fan-out/fan-in، ينشئ التنسيق مهام نشاط متوازية وينتظر اكتمال الجميع.

public override async Task<Dictionary<string, int>> RunAsync(TaskOrchestrationContext context, List<string> workItems)
{
    // Step 1: Fan-out by creating a task for each work item in parallel
    List<Task<Dictionary<string, int>>> processingTasks = new List<Task<Dictionary<string, int>>>();

    foreach (string workItem in workItems)
    {
        // Create a task for each work item (fan-out)
        Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
            nameof(ProcessWorkItemActivity), workItem);
        processingTasks.Add(task);
    }

    // Step 2: Wait for all parallel tasks to complete
    Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

    // Step 3: Fan-in by aggregating all results
    Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
        nameof(AggregateResultsActivity), results);

    return aggregatedResults;
}

يتم تنفيذ كل نشاط كفئة منفصلة مزينة بالسمة [DurableTask] .

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    // Implementation processes a single work item
}

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    // Implementation aggregates individual results
}

Program.cs

يستخدم Microsoft.Extensions.Hosting العامل لإدارة دورة الحياة المناسبة.

using Microsoft.Extensions.Hosting;
//..

builder.Services.AddDurableTaskWorker()
    .AddTasks(registry =>
    {
        registry.AddOrchestrator<ParallelProcessingOrchestration>();
        registry.AddActivity<ProcessWorkItemActivity>();
        registry.AddActivity<AggregateResultsActivity>();
    })
    .UseDurableTaskScheduler(connectionString);

مشروع العميل

مشروع العميل:

  • يستخدم نفس منطق connection string المستخدم في الوركر.
  • إنشاء قائمة بعناصر العمل التي ستتم معالجتها بالتوازي.
  • جدولة مثيل تزامن مع القائمة كإدخال.
  • ينتظر اكتمال التنسيق ويعرض النتائج المجمعة.
  • يستخدم WaitForInstanceCompletionAsync للاستطلاع الفعال.
List<string> workItems = new List<string>
{
    "Task1",
    "Task2",
    "Task3",
    "LongerTask4",
    "VeryLongTask5"
};

// Schedule the orchestration with the work items
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    "ParallelProcessingOrchestration", 
    workItems);

// Wait for completion
var instance = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true,
    cts.Token);

worker.py

لإظهار نمط توزيع المهام/توزيع المروحة، ينشئ تنسيق مشروع العامل مهام نشاط متوازية وينتظر اكتمال الجميع. المنسق:

  1. يتلقى قائمة بعناصر العمل كإدخل.
  2. إنه "ينعش" عن طريق إنشاء مهام متوازية لكل عنصر عمل (استدعاء process_work_item لكل عنصر).
  3. ينتظر حتى تكتمل جميع المهام باستخدام task.when_all.
  4. ثم "المشجعين في" عن طريق تجميع النتائج مع aggregate_results النشاط.
  5. يتم إرجاع النتيجة المجمعة النهائية إلى العميل.

باستخدام fan-out/fan-in، ينشئ التنسيق مهام نشاط متوازية وينتظر اكتمال الجميع.

# Orchestrator function
def fan_out_fan_in_orchestrator(ctx, work_items: list) -> dict:
    logger.info(f"Starting fan out/fan in orchestration with {len(work_items)} items")

    # Fan out: Create a task for each work item
    parallel_tasks = []
    for item in work_items:
        parallel_tasks.append(ctx.call_activity("process_work_item", input=item))

    # Wait for all tasks to complete
    logger.info(f"Waiting for {len(parallel_tasks)} parallel tasks to complete")
    results = yield task.when_all(parallel_tasks)

    # Fan in: Aggregate all the results
    logger.info("All parallel tasks completed, aggregating results")
    final_result = yield ctx.call_activity("aggregate_results", input=results)

    return final_result

client.py

مشروع العميل:

  • يستخدم نفس منطق connection string المستخدم في الوركر.
  • إنشاء قائمة بعناصر العمل التي ستتم معالجتها بالتوازي.
  • جدولة مثيل تزامن مع القائمة كإدخال.
  • ينتظر اكتمال التنسيق ويعرض النتائج المجمعة.
  • يستخدم wait_for_orchestration_completion للاستطلاع الفعال.
# Generate work items (default 10 items if not specified)
count = int(sys.argv[1]) if len(sys.argv) > 1 else 10
work_items = list(range(1, count + 1))

logger.info(f"Starting new fan out/fan in orchestration with {count} work items")

# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(
    "fan_out_fan_in_orchestrator", 
    input=work_items
)

logger.info(f"Started orchestration with ID = {instance_id}")

# Wait for orchestration to complete
logger.info("Waiting for orchestration to complete...")
result = client.wait_for_orchestration_completion(
    instance_id,
    timeout=60
)

لإظهار نمط توزيع المهام/المروحة، FanOutFanInPattern ينشئ تنسيق المشروع مهام نشاط متوازية وينتظر اكتمال الجميع. المنسق:

  1. يأخذ قائمة بعناصر العمل كمدخل.
  2. مرر المشجعين عن طريق إنشاء مهمة منفصلة لكل عنصر عمل باستخدام ''.
  3. تنفيذ جميع المهام بالتوازي.
  4. ينتظر اكتمال جميع المهام باستخدام ''.
  5. المشجعين من خلال تجميع جميع النتائج الفردية باستخدام ''.
  6. إرجاع النتيجة المجمعة النهائية إلى العميل.

يحتوي المشروع على:

  • DurableTaskSchedulerWorkerExtensions العامل: يحدد دالات المنسق والنشاط.
  • DurableTaskSchedulerClientExtension client: يجهز مضيف العامل مع التعامل الصحيح مع connection string.

العامل

باستخدام fan-out/fan-in، ينشئ التنسيق مهام نشاط متوازية وينتظر اكتمال الجميع.

DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "FanOutFanIn_WordCount"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                List<?> inputs = ctx.getInput(List.class);
                List<Task<Integer>> tasks = inputs.stream()
                        .map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
                        .collect(Collectors.toList());
                List<Integer> allWordCountResults = ctx.allOf(tasks).await();
                int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
                ctx.complete(totalWordCount);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "CountWords"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringTokenizer tokenizer = new StringTokenizer(input);
                return tokenizer.countTokens();
            };
        }
    })
    .build();

// Start the worker
worker.start();

العميل

مشروع العميل:

  • يستخدم نفس منطق connection string المستخدم في الوركر.
  • إنشاء قائمة بعناصر العمل التي ستتم معالجتها بالتوازي.
  • جدولة مثيل تزامن مع القائمة كإدخال.
  • ينتظر اكتمال التنسيق ويعرض النتائج المجمعة.
  • يستخدم waitForInstanceCompletion للاستطلاع الفعال.
DurableTaskClient client = DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString).build();

// The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList(
        "Hello, world!",
        "The quick brown fox jumps over the lazy dog.",
        "If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
        "The greatest glory in living lies not in never falling, but in rising every time we fall.",
        "Always remember that you are absolutely unique. Just like everyone else.");

// Schedule an orchestration which will reliably count the number of words in all the given sentences.
String instanceId = client.scheduleNewOrchestrationInstance(
        "FanOutFanIn_WordCount",
        new NewOrchestrationInstanceOptions().setInput(listOfStrings));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(int.class));

لتوضيح نمط الخروج الخارجي/الداخل، ينشئ التوزيع الموسيقي النموذجي مهام نشاط متوازية وينتظر إكمال كل شيء. المنسق:

  1. يتلقى قائمة بعناصر العمل كإدخل.
  2. تفرق عن طريق إنشاء مهام متوازية لكل عنصر عمل (استدعاء processWorkItem لكل عنصر منها).
  3. ينتظر حتى تكتمل جميع المهام باستخدام whenAll.
  4. المشجعون يجمعون النتائج مع النشاط aggregateResults .
  5. إرجاع النتيجة المجمعة النهائية إلى العميل.

يحتوي المشروع على:

  • worker.mjs: يحدد وظائف المنظم والنشاط.
  • client.mjs: تحدد جدولة التوزيع وتنتظر الانتهاء.

worker.mjs

باستخدام fan-out/fan-in، ينشئ التنسيق مهام نشاط متوازية وينتظر اكتمال الجميع.

import { whenAll } from "@microsoft/durabletask-js";
import { createAzureManagedWorkerBuilder } from "@microsoft/durabletask-js-azuremanaged";

// Orchestrator function using generator syntax
const fanOutFanInOrchestrator = async function* (ctx, workItems) {
  const items = Array.isArray(workItems) ? workItems : [];

  // Fan out: schedule parallel activity calls
  const tasks = items.map((item) => ctx.callActivity(processWorkItem, item));

  // Wait for all tasks using whenAll
  const processedResults = yield whenAll(tasks);

  // Fan in: aggregate results
  const finalResult = yield ctx.callActivity(aggregateResults, processedResults);

  return finalResult;
};

// Activity that processes a single work item
const processWorkItem = async (_ctx, workItem) => {
  const normalizedItem = Number(workItem);
  const delayMs = 500 + Math.floor(Math.random() * 1500);
  console.log(`Processing work item ${normalizedItem} (delay ${delayMs}ms)`);

  await new Promise((resolve) => setTimeout(resolve, delayMs));

  return {
    item: normalizedItem,
    result: normalizedItem * normalizedItem,
  };
};

// Activity that aggregates results
const aggregateResults = async (_ctx, results) => {
  const sum = results.reduce((acc, curr) => acc + curr.result, 0);

  return {
    totalItems: results.length,
    sum,
    average: results.length ? sum / results.length : 0,
    results,
  };
};

// Build and start worker
const connectionString = `Endpoint=http://localhost:8080;Authentication=None;TaskHub=default`;
const worker = createAzureManagedWorkerBuilder(connectionString)
  .addOrchestrator(fanOutFanInOrchestrator)
  .addActivity(processWorkItem)
  .addActivity(aggregateResults)
  .build();

await worker.start();

client.mjs

مشروع العميل:

  • يستخدم نفس منطق connection string المستخدم في الوركر.
  • إنشاء قائمة بعناصر العمل التي ستتم معالجتها بالتوازي.
  • جدولة مثيل تزامن مع القائمة كإدخال.
  • ينتظر اكتمال التنسيق ويعرض النتائج المجمعة.
  • يستخدم waitForOrchestrationCompletion للاستطلاع الفعال.
import { OrchestrationStatus } from "@microsoft/durabletask-js";
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";

// Create work items array
const count = 10;
const workItems = Array.from({ length: count }, (_unused, index) => index + 1);

const connectionString = `Endpoint=http://localhost:8080;Authentication=None;TaskHub=default`;
const client = createAzureManagedClient(connectionString);

// Schedule orchestration
const instanceId = await client.scheduleNewOrchestration(
  "fanOutFanInOrchestrator",
  workItems
);

console.log(`Started orchestration with ID: ${instanceId}`);

// Wait for completion
const state = await client.waitForOrchestrationCompletion(instanceId, true, 120);

if (state.runtimeStatus === OrchestrationStatus.COMPLETED) {
  console.log(`Result: ${state.serializedOutput}`);
}

await client.stop();

الخطوات التالية

الآن بعد أن شغلت العينة محليا باستخدام محاكي Durable Task Scheduler، جرب إنشاء جدولة ومورد في مركز المهام ونشره على Azure Container Apps.