ما هي Durable Functions؟

Durable Functions هي ميزة من ميزات Azure Functions التي تتيح لك كتابة وظائف ذات حالة في بيئة حساب بلا خادم. يتيح لك الملحق تحديد مهام سير العمل المناسبة عن طريق كتابة دالات المنسقوالكيانات المناسبة عن طريق كتابة دالات الكيانباستخدام نموذج برمجة Azure Functions. في الخلفية، يدير الملحق الحالة ونقاط التحقق وإعادة التشغيل، مما يسمح لك بالتركيز على منطق عملك.

اللغات المدعومة

تم تصميم Durable Functions للعمل مع جميع لغات برمجة Azure Functions ولكن قد يكون لها حد أدنى مختلف من المتطلبات لكل لغة. يوضح الجدول التالي الحد الأدنى من تكوينات التطبيق المدعومة:

مكدس اللغة إصدارات Azure Functions Runtime نسخة عامل اللغة الحد الأدنى من إصدار المجموعات
.NET / C# / F#‎ +‎Functions 1.0 قيد المعالجة
خارج العملية
غير متوفر
JavaScript/TypeScript (نموذج V3 prog) +‎Functions 2.0 العقدة 8+ مجموعات 2.x
JavaScript/TypeScript (نموذج V4 prog) الدالات 4.25+ العقدة 18+ أكثر من 3.15 حزم
Python +‎Functions 2.0 Python 3.7+ مجموعات 2.x
Python (نموذج V2 prog) الدالات 4.0+ Python 3.7+ أكثر من 3.15 حزم
PowerShell +‎Functions 3.0 +PowerShell 7 مجموعات 2.x
Java الدالات 4.0+ +Java 8 مجموعات 4.x

هام

تستخدم هذه المقالة علامات التبويب لدعم إصدارات متعددة من نموذج البرمجة Node.js. يتوفر نموذج v4 بشكل عام وتم تصميمه للحصول على تجربة أكثر مرونة وبديهية لمطوري JavaScript وTypeScript. لمزيد من التفاصيل حول كيفية عمل نموذج v4، راجع دليل مطور Azure Functions Node.js. لمعرفة المزيد حول الاختلافات بين v3 وv4، راجع دليل الترحيل.

هام

تستخدم هذه المقالة علامات التبويب لدعم إصدارات متعددة من نموذج برمجة Python. نموذج v2 متاح بشكل عام ومصمم لتوفير طريقة أكثر تركز على التعليمات البرمجية لوظائف التأليف من خلال مصممي الديكور. لمزيد من التفاصيل حول كيفية عمل نموذج v2، راجع دليل مطور Azure Functions Python.

مثل Azure Functions، هناك قوالب لمساعدتك في تطوير Durable Functions باستخدام Visual Studio وVisual Studio Code ومدخل Azure.

أنماط التطبيق

تتمثل حالة الاستخدام الأساسي لـ Durable Functions في تبسيط متطلبات التنسيق المعقدة المناسبة في التطبيقات بلا خادم. تصف المقاطع التالية أنماط التطبيق النموذجية التي يمكن أن تستفيد من Durable Functions:

النمط #1: سلسلة الدالات

في نمط تسلسل الدالة، يتم تنفيذ سلسلة من الدالات بترتيب معين. في هذا النمط، يتم تطبيق إخراج دالة واحدة على إدخال دالة أخرى. يضمن استخدام قوائم الانتظار بين كل دالة أن النظام يبقى دائما وقابلا للتطوير، على الرغم من وجود تدفق للتحكم من دالة إلى أخرى.

رسم تخطيطي لنمط تسلسل الدالات

يمكنك استخدام Durable Functions لتنفيذ نمط تسلسل الدالة بإيجاز كما هو موضح في المثال التالي.

في هذا المثال، القيم F1 وF2 وF3 وF4 أسماء دالات أخرى في نفس تطبيق الدالة. يمكنك تنفيذ تدفق التحكم باستخدام بنيات الترميز الحتمي العادي. يتم تنفيذ الرمز من أعلى إلى أسفل. يمكن أن يتضمن الرمز دلالات تدفق التحكم اللغوي الموجودة، مثل الشرطيات والحلقات. يمكنك تضمين منطق معالجة الأخطاء في الكتل try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

يمكنك استخدام المعلمة context لاستدعاء دالات أخرى بالاسم، ومعلمات المرور، وإرجاع إخراج الدالة. في كل مرة يستدعي فيها الرمز await، يتحقق إطار عمل Durable Functions من تقدم مثيل الدالة الحالية. إذا تمت إعادة تشغيل العملية أو الجهاز الظاهري أثناء التنفيذ، فسيتم استئناف مثيل الدالة من استدعاء await السابق. لمزيد من المعلومات، راجع القسم التالي، النمط #2:توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

يمكنك استخدام الكائن context.df لاستدعاء دالات أخرى بالاسم، ومعلمات المرور، وإرجاع إخراج الدالة. في كل مرة يستدعي فيها الرمز yield، يتحقق إطار عمل Durable Functions من تقدم مثيل الدالة الحالية. إذا تمت إعادة تشغيل العملية أو الجهاز الظاهري أثناء التنفيذ، فسيتم استئناف مثيل الدالة من استدعاء yield السابق. لمزيد من المعلومات، راجع القسم التالي، النمط #2:توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة.

إشعار

الكائن context في JavaScript يمثل سياق الدالة الكامل. ادخل إلى سياق Durable Functions باستخدام خاصية df في السياق الرئيسي.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

يمكنك استخدام الكائن context لاستدعاء دالات أخرى بالاسم، ومعلمات المرور، وإرجاع إخراج الدالة. في كل مرة يستدعي فيها الرمز yield، يتحقق إطار عمل Durable Functions من تقدم مثيل الدالة الحالية. إذا تمت إعادة تشغيل العملية أو الجهاز الظاهري أثناء التنفيذ، فسيتم استئناف مثيل الدالة من استدعاء yield السابق. لمزيد من المعلومات، راجع القسم التالي، النمط #2:توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة.

إشعار

الكائن context في Python يمثل سياق التنسيق. ادخل إلى سياق Azure Functions باستخدام خاصية function_context في سياق التنسيق.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

يمكنك استخدام الأمر Invoke-DurableActivity لاستدعاء دالات أخرى بالاسم، ومعلمات المرور، وإرجاع إخراج الدالة. في كل مرة يستدعي فيها الرمز Invoke-DurableActivity، بدون تبديل NoWait، يتحقق إطار عمل Durable Functions من تقدم مثيل الدالة الحالية. إذا تمت إعادة تشغيل العملية أو الجهاز الظاهري أثناء التنفيذ، فسيتم استئناف مثيل الدالة من استدعاء Invoke-DurableActivity السابق. لمزيد من المعلومات، راجع القسم التالي، النمط #2:توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

يمكنك استخدام الكائن ctx لاستدعاء دالات أخرى بالاسم، ومعلمات المرور، وإرجاع إخراج الدالة. إخراج هذه الأساليب هو كائن Task<V> حيث V هو نوع البيانات التي تم إرجاعها بواسطة الدالة التي تم استدعاؤها. في كل مرة تتصل فيها بـ Task<V>.await()، يقوم إطار عمل الوظائف الدائمة بنقاط التحقق من تقدم مثيل الوظيفة الحالية. إذا تمت إعادة تدوير العملية بشكل غير متوقع في منتصف الطريق خلال التنفيذ، فسيتم استئناف مثيل الوظيفة من استدعاء Task<V>.await() السابق. لمزيد من المعلومات، راجع القسم التالي، النمط #2:توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة.

النمط #2: توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة

في نمط توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة، يمكنك تنفيذ دالات متعددة في نفس الوقت، ومن ثم الانتظار حتى انتهاء جميع الدالات. في كثير من الأحيان، يتم إجراء بعض أعمال التجميع على النتائج التي يتم إرجاعها من الدالات.

رسم تخطيطي لنمط توزيع المهام إلى عدة وجهات/توزيع المهام إلى نفس الوجهة

مع الدالات العادية، يمكنك توزيع المهام إلى عدة وجهات عن طريق إرسال الدالة رسائل متعددة إلى قائمة انتظار. عودة توزيع المهام إلى نفس الوجهة أصعب بكثير. لتوزيع المهام إلى نفس الوجهة في دالة عادية، يمكنك كتابة الرمز لتعقبه عند انتهاء الدالات التي تم تشغيلها في قائمة الانتظار، ومن ثَمَّ تخزين مخرجات الدالة.

يعالج ملحق Durable Functions هذا النمط برمز بسيط نسبيًا:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

يتم توزيع عمل «توزيع المهام إلى عدة وجهات» على مثيلات عديدة لدالة F2. يتم تعقب العمل باستخدام قائمة مهام ديناميكية. يتم استدعاء Task.WhenAll لانتظار جميع الدالات التي تم استدعاؤها لتنتهي. ثم يتم تجميع مخرجات الدالة F2 من قائمة المهام الديناميكية وتمريرها إلى الدالة F3.

يضمن التحقق التلقائي الذي يحدث في استدعاء await في Task.WhenAll أن أي عطل محتمل أثناء التشغيل أو إعادة التمهيد لا يتطلب إعادة تشغيل مهمة اكتملت بالفعل.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

يتم توزيع عمل «توزيع المهام إلى عدة وجهات» على مثيلات عديدة لدالة F2. يتم تعقب العمل باستخدام قائمة مهام ديناميكية. يتم استدعاء واجهة برمجة التطبيقات context.df.Task.all لانتظار جميع الدالات التي تم استدعاؤها لتنتهي. ثم يتم تجميع مخرجات الدالة F2 من قائمة المهام الديناميكية وتمريرها إلى الدالة F3.

يضمن التحقق التلقائي الذي يحدث في استدعاء yield في context.df.Task.all أن أي عطل محتمل أثناء التشغيل أو إعادة التمهيد لا يتطلب إعادة تشغيل مهمة اكتملت بالفعل.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

يتم توزيع عمل «توزيع المهام إلى عدة وجهات» على مثيلات عديدة لدالة F2. يتم تعقب العمل باستخدام قائمة مهام ديناميكية. يتم استدعاء واجهة برمجة التطبيقات context.task_all لانتظار جميع الدالات التي تم استدعاؤها لتنتهي. ثم يتم تجميع مخرجات الدالة F2 من قائمة المهام الديناميكية وتمريرها إلى الدالة F3.

يضمن التحقق التلقائي الذي يحدث في استدعاء yield في context.task_all أن أي عطل محتمل أثناء التشغيل أو إعادة التمهيد لا يتطلب إعادة تشغيل مهمة اكتملت بالفعل.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

يتم توزيع عمل «توزيع المهام إلى عدة وجهات» على مثيلات عديدة لدالة F2. الرجاء ملاحظة استخدام مفتاح NoWait في استدعاء الوظيفة F2 : يتيح هذا المفتاح للمنسق متابعة استدعاء F2 دون انتظار اكتمال النشاط. يتم تعقب العمل باستخدام قائمة مهام ديناميكية. يتم استدعاء الأمر Wait-ActivityFunction لانتظار جميع الدالات التي تم استدعاؤها لتنتهي. ثم يتم تجميع مخرجات الدالة F2 من قائمة المهام الديناميكية وتمريرها إلى الدالة F3.

يضمن التحقق التلقائي الذي يحدث في استدعاء Wait-ActivityFunction أن أي عطل محتمل أثناء التشغيل أو إعادة التمهيد لا يتطلب إعادة تشغيل مهمة اكتملت بالفعل.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

يتم توزيع عمل «توزيع المهام إلى عدة وجهات» على مثيلات عديدة لدالة F2. يتم تعقب العمل باستخدام قائمة مهام ديناميكية. يتم استدعاء ctx.allOf(parallelTasks).await() لانتظار جميع الدالات التي تم استدعاؤها لتنتهي. بعد ذلك، يتم تجميع مخرجات الدالة F2 من قائمة المهام الديناميكية ويتم إرجاعها كإخراج للدالة المنسقة.

تضمن نقاط التحقق التلقائية التي تحدث عند .await() المكالمة ctx.allOf(parallelTasks) أن عملية إعادة التدوير غير المتوقعة لا تتطلب إعادة تشغيل أي مهام مكتملة بالفعل.

إشعار

في حالات نادرة، من الممكن أن يحدث عطل في النافذة بعد اكتمال دالة نشاط ولكن قبل حفظ الاكتمال في سجل التنسيق. إذا حدث هذا، فستقوم دالة النشاط بالعمل من البداية بعد استرداد العملية.

النمط #3: واجهات برمجة تطبيقات HTTP غير المتزامنة

يعالج نمط واجهة برمجة تطبيقات HTTP غير المتزامنة مشكلة تنسيق حالة عمليات التشغيل طويلة الأمد مع العملاء الخارجيين. هناك طريقة شائعة لتنفيذ هذا النمط عن طريق وجود نقطة نهاية HTTP تشغِّل الإجراء طويل الأمد. ثم إعادة توجيه العميل إلى نقطة نهاية حالة حيث يحاول العميل الاستقصاء لمعرفة وقت انتهاء العملية.

رسم تخطيطي لنمط واجهة برمجة تطبيقات HTTP

توفر Durable Functions دعم الأجزاء المدمجة لهذا النمط، لتبسيط أو حتى التخلص من الرمز الذي تحتاج لكتابته للتفاعل مع عمليات تنفيذ الدالة طويلة الأمد. على سبيل المثال، تظهر نماذج التشغيل السريع ل Durable Functions (C#‎ وJavaScript وTypeScript وPython وPowerShell وJava) أمر REST بسيطا يمكنك استخدامه لبدء مثيلات دالة منسق جديدة. بعد بدء تشغيل مثيل، يعرض الملحق واجهات برمجة تطبيقات HTTP للإخطار على الويب التي تستعلم عن حالة دالة المنسق.

يظهر المثال التالي أوامر REST التي تبدأ بتشغيل منسق وتستعلم عن حالته. للتوضيح، يتم حذف بعض تفاصيل البروتوكول من المثال.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

لأن وقت تشغيل Durable Functions يُدير الحالة بالنسبة لك، لا تحتاج إلى تنفيذ آلية تتبع الحالة الخاصة بك.

يعرض ملحق Durable Functions واجهات برمجة التطبيقات HTTP المدمجة التي تُدير التنسيقات طويلة الأمد. يمكنك بدلا من ذلك تنفيذ هذا النمط بنفسك باستخدام مشغلات الدالة الخاصة بك (مثل HTTP أو قائمة انتظار أو مراكز أحداث Azure) وربط العميل الدائم. على سبيل المثال، قد تستخدم رسالة قائمة انتظار لتشغيل الإنهاء. أو يمكنك استخدام مشغل HTTP محمي بواسطة نهج مصادقة Microsoft Entra بدلا من واجهات برمجة تطبيقات HTTP المضمنة التي تستخدم مفتاحا تم إنشاؤه للمصادقة.

لمزيد من المعلومات، راجع المقالة ميزات HTTP والتي توضح كيف يمكنك كشف العمليات غير المتزامنة طويلة الأمد عبر HTTP باستخدام ملحق Durable Functions.

النمط #4: المراقبة

يُشير نمط المراقبة إلى عملية متكررة ومرنة في سير العمل. مثال على ذلك الاستقصاء حتى يتم استيفاء شروط محددة. يمكنك استخدام مشغل مؤقت منتظم لمعالجة سيناريو أساسي، مثل مهمة تنظيف دورية، لكن الفاصل الزمني لها ثابت وتصبح إدارة دورات حياة المثيلات معقدة. يمكنك استخدام Durable Functions لإنشاء فواصل زمنية مرنة للتكرار، وإدارة دورات حياة المهام، وإنشاء عمليات مراقبة متعددة من تنسيق واحد.

ومن أمثلة نمط المراقبة عكس سيناريو واجهة برمجة تطبيقات HTTP غير المتزامنة السابقة. بدلاً من تعريض نقطة نهاية لعميل خارجي لمراقبة عملية طويلة الأمد، تستهلك المراقبة طويلة الأمد نقطة نهاية خارجية، ثم تنتظر تغيير حالة.

رسم تخطيطي لنمط المراقبة

في أسطر قليلة من الرمز، يمكنك استخدام Durable Functions لإنشاء مراقبات متعددة تلاحظ نقاط النهاية الإجبارية. يمكن أن تنهي المراقبات التنفيذ عند استيفاء شرط، أو يمكن أن تستخدم دالة أخرى عميل التنسيق الدائم لإنهاء المراقبات. يمكنك تغيير الفاصل الزمني للمراقبة wait استنادًا إلى شرط معين (على سبيل المثال، التراجع الأسي.)

ينفذ الرمز التالي مراقبة أساسية:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

عند تلقي طلب، يتم إنشاء مثيل تنسيق جديد لمعرف الوظيفة هذا. يستقصي المثيل حالة حتى يتم استيفاء أحد الشروط أو حتى انتهاء صلاحية المهلة. يتحكم مؤقت دائم في الفاصل الزمني للاستقصاء. ثم، يمكن تنفيذ المزيد من العمل أو يمكن إنهاء التنسيق.

النمط #5: التفاعل البشري

العديد من العمليات التلقائية تنطوي على نوع من التفاعل البشري. إشراك البشر في عملية تلقائية أمر صعب؛ لأن الأشخاص ليسوا متاحين بدرجة كبيرة وليسوا مستجيبين مثل الخدمات السحابية. قد تسمح عملية تلقائية لهذا التفاعل باستخدام المهلات ومنطق التعويض.

عملية الموافقة هي مثال على عملية الأعمال التي تنطوي على تفاعل بشري. قد تكون هناك حاجة إلى موافقة المدير لتقرير المصروفات الذي يتجاوز مبلغًا معينًا بالدولار. إذا لم يوافق المدير على تقرير المصروفات في غضون 72 ساعة (قد يكون المدير قد ذهب في إجازة)، تبدأ عملية التصعيد للحصول على الموافقة من شخص آخر (ربما مدير المدير).

رسم تخطيطي لنمط التفاعل البشري

يمكنك تطبيق النمط في هذا المثال باستخدام دالة منسق. يستخدم التنسيق مؤقتًا دائمًا لطلب الموافقة. يقوم المنسق بالتصعيد إذا انتهت المهلة. ينتظر المنسق حدثًا خارجيًا، مثل إعلام يتم إنشاؤه بواسطة تفاعل بشري.

هذه الأمثلة تنشئ عملية موافقة لإظهار نمط التفاعل البشري:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

لإنشاء المؤقت الدائم، استدع context.CreateTimer. يتم تلقي الإعلام من قبل context.WaitForExternalEvent. ثم، يتم استدعاء Task.WhenAny لاتخاذ قرار إما بالتصعيد (انتهاء المهلة يحدث أولاً) وإما معالجة الموافقة (يتم تلقي الموافقة قبل انتهاء المهلة).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

لإنشاء المؤقت الدائم، استدع context.df.createTimer. يتم تلقي الإعلام من قبل context.df.waitForExternalEvent. ثم، يتم استدعاء context.df.Task.any لاتخاذ قرار إما بالتصعيد (انتهاء المهلة يحدث أولاً) وإما معالجة الموافقة (يتم تلقي الموافقة قبل انتهاء المهلة).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

لإنشاء المؤقت الدائم، استدع context.create_timer. يتم تلقي الإعلام من قبل context.wait_for_external_event. ثم، يتم استدعاء context.task_any لاتخاذ قرار إما بالتصعيد (انتهاء المهلة يحدث أولاً) وإما معالجة الموافقة (يتم تلقي الموافقة قبل انتهاء المهلة).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

لإنشاء المؤقت الدائم، استدع Start-DurableTimer. يتم تلقي الإعلام من قبل Start-DurableExternalEventListener. ثم، يتم استدعاء Wait-DurableTask لاتخاذ قرار إما بالتصعيد (انتهاء المهلة يحدث أولاً) وإما معالجة الموافقة (يتم تلقي الموافقة قبل انتهاء المهلة).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

يوقف استدعاء الطريقة ctx.waitForExternalEvent(...).await() التزامن مؤقتاً حتى يتلقى حدثاً باسم ApprovalEvent، والذي يحتوي على حمولة boolean. إذا تم استلام الحدث، يتم استدعاء وظيفة نشاط لمعالجة نتيجة الموافقة. ومع ذلك، في حالة عدم تلقي مثل هذا الحدث قبل انتهاء صلاحية timeout (72 ساعة)، يتم رفع TaskCanceledException ويتم استدعاء وظيفة النشاط Escalate.

إشعار

لا توجد رسوم مقابل الوقت المستغرق في انتظار الأحداث الخارجية عند التشغيل في خطة الاستهلاك.

يمكن لعميل خارجي تسليم إعلام الحدث إلى دالة منسق انتظار باستخدام واجهات برمجة تطبيقات HTTP المدمجة:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

يمكن أيضًا رفع حدث باستخدام عميل التنسيق الدائم من دالة أخرى في نفس تطبيق الدالة:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

النمط #6: المجمع (الكيانات المناسبة)

النمط السادس يتألف من تجميع بيانات الحدث على مدى فترة زمنية في كيان واحد قابل للعنوان. في هذا النمط، قد تأتي البيانات التي يتم تجميعها من مصادر متعددة، أو قد يتم تسليمها على دفعات، أو قد تكون مبعثرة على فترات زمنية طويلة. قد يحتاج المجمع إلى اتخاذ إجراء بشأن بيانات الحدث عند وصولها، وقد يحتاج العملاء الخارجيون إلى الاستعلام عن البيانات المجمعة.

مخطط مجمع

الشيء الصعب حول محاولة تنفيذ هذا النمط مع دالات عادية وبلا حالة هو أن التحكم في التزامن يصبح تحديًا كبيرًا. لست فقط بحاجة إلى القلق بشأن مؤشرات ترابط متعددة تعمل على تعديل نفس البيانات في نفس الوقت، وإنما بحاجة أيضًا إلى القلق بشأن ضمان أن المجمع يعمل فقط على جهاز ظاهري واحد في المرة.

يمكنك استخدام الكيانات الدائمة لتنفيذ هذا النمط بسهولة كدالة واحدة.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

يمكن أيضًا نمذجة الكيانات الدائمة كفئات في .NET. يمكن أن يكون هذا النمط مفيدًا إذا تم إصلاح قائمة العمليات وأصبحت كبيرة. المثال التالي هو تنفيذ مكافئ لكيان Counter باستخدام أساليب وفئات .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

إشعار

الكيانات الدائمة غير مدعمة حاليًا في PowerShell.

إشعار

الكيانات المتينة غير مدعومة حالياً في Java.

يمكن للعملاء إدراج العمليات (تُعرف أيضًا بـ "إرسال الإشارات") لدالة كيان باستخدام ربط عميل الكيان.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

إشعار

يتوفر الوكلاء الذين تم إنشاؤهم ديناميكيًا أيضًا في .NET لكيانات إرسال الإشارات بطريقة أمان النوع. إضافة إلى إرسال الإشارات، يمكن للعملاء أيضًا الاستعلام عن حالة دالة كيان باستخدام أساليب أمان النوع على ربط عميل التنسيق.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

تتوفر دالات الكيان في Durable Functions 2.0 والأحدث بالنسبة إلى C#، وJavaScript، وPython.

التكنولوجيا

في الخلفية، تم بناء ملحق Durable Functions فوق Durable Task Framework، وهي مكتبة مفتوحة المصدر على GitHub تُستخدم لبناء سير العمل في الرمز. مثل Azure Functions التي تمثل تطورًا بلا خادم لـ Azure WebJobs، فإن Durable Functions هي تطور بلا خادم لـ Durable Task Framework. تستخدم Microsoft والمؤسسات الأخرى Durable Task Framework بشكل مكثف لأتمتة العمليات ذات المهام الحرجة. فهو مناسب بشكل طبيعي لبيئة Azure Functions بلا خادم.

قيود الرمز

لتوفير ضمانات تنفيذ موثوقة وطويلة الأمد، تحتوي دالات التنسيق على مجموعة من قواعد الترميز التي يجب اتباعها. لمزيد من المعلومات، راجع المقالة قيود رمز دالة المنسق.

الفوترة

تتم فوترة Durable Functions مثل Azure Functions. للحصول على مزيدٍ من المعلومات، راجع تسعير Azure Functions. عند تنفيذ دوال المنسق في Azure Functions خطة الاستهلاك، يجب أن تكون على دراية ببعض سلوكيات الفوترة. لمزيد من المعلومات حول هذه السلوكيات، راجع مقالة فوترة Durable Functions.

تمكن من الاستخدام بسرعة

يمكنك البدء مع Durable Functions في أقل من 10 دقائق عن طريق استكمال واحدة من هذه البرامج التعليمية للتشغيل السريع بلغة محددة:

في عمليات التشغيل السريع هذه يمكنك إنشاء واختبار دالة "مرحبًا بالعالم" الدائمة محليًا. ثم نشر رمز الدالة إلى Azure. الدالة التي تقوم بإنشائها تقوم بتنسيق وتسلسل المكالمات إلى دالات أخرى.

المنشورات

تم تطوير الوظائف الدائمة بالتعاون مع Microsoft Research. ونتيجة لذلك، ينتج فريق "الوظائف المتينة" بنشاط الأوراق البحثية والتحف؛ وتشمل هذه:

معرفة المزيد

يوضح الفيديو التالي مزايا Durable Functions:

لأن Durable Functions ملحق متقدم لـ Azure Functions، فهو ليس مناسبًا لكل التطبيقات. للمقارنة بتقنيات تنسيق Azure الأخرى، راجع مقارنة Azure Functions وتطبيقات Azure Logic Apps.

الخطوات التالية