قم بتغيير معالج موجز التغيير في Azure Cosmos DB

ينطبق على: NoSQL

يعد معالج موجز التغيير جزءًا من Azure Cosmos DB .NET V3 وJava V4 SDKs. إنه يبسط عملية قراءة موجز التغيير وتوزيعات معالجة الحدث عبر مستهلكين متعددين بشكل فعال.

تتمثل الفائدة الرئيسية لاستخدام معالج موجز التغيير في تصميمه المتسامح مع الأخطاء، والذي يضمن تسليم "مرة واحدة على الأقل" لجميع الأحداث في موجز التغيير.

عدة تطوير البرامج SDK مدعومة

.Net V3 Java Node.JS Python

مكونات معالج موجز التغيير

يحتوي معالج موجز التغيير على أربعة مكونات رئيسية:

  • الحاوية المراقبة: تحتوي الحاوية المراقبة على البيانات التي يتم إنشاء موجز التغيير منها. تنعكس أي إدخالات وتحديثات للحاوية المراقبة في موجز التغيير الخاص بالحاوية.

  • حاوية التأجير: تعمل حاوية التأجير كمخزن حالة وتنسق معالجة موجز التغيير عبر العديد من العمال. يمكن تخزين حاوية الإيجار في نفس الحساب مثل الحاوية المراقبة أو في حساب منفصل.

  • مثيل الحساب : يستضيف مثيل الحساب معالج خلاصة التغيير للاستماع إلى التغييرات. اعتمادا على النظام الأساسي، قد يتم تمثيله بواسطة جهاز ظاهري (VM) أو جراب Kubernetes أو مثيل Azure App Service أو جهاز فعلي. يحتوي مثيل الحساب على معرف فريد يسمى اسم المثيل خلال هذه المقالة.

  • المفوض: المفوض هو التعليمات البرمجية التي تحدد ما تريد أنت، المطور، القيام به مع كل دفعة من التغييرات التي يقرأها معالج موجز التغيير.

لفهم كيفية عمل هذه العناصر الأربعة لمعالج موجز التغيير معا، دعونا ننظر إلى مثال في الرسم التخطيطي التالي. تخزن الحاوية المراقبة العناصر وتستخدم "المدينة" كمفتاح القسم. يتم توزيع قيم مفتاح القسم في نطاقات (يمثل كل نطاق قسما ماديا) التي تحتوي على عناصر.

يظهر الرسم التخطيطي مثيلين للحساب، ويقوم معالج موجز التغيير بتعيين نطاقات مختلفة لكل مثيل لزيادة توزيع الحوسبة إلى أقصى حد. كل مثيل له اسم مختلف وفريد.

تتم قراءة كل نطاق بالتوازي. يتم الحفاظ على تقدم النطاق بشكل منفصل عن النطاقات الأخرى في حاوية التأجير من خلال مستند التأجير . تمثل عقود الإيجار الحالة الحالية لمعالج التغذية بالتغيير.

مثال على معالج موجز التغيير

تنفيذ معالج موجز التغيير

يتوفر معالج موجز التغيير في .NET حاليا فقط لأحدث وضع إصدار. نقطة الإدخال هي دائما الحاوية المراقبة. في حالة Container ما، يمكنك استدعاء GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

المعلمة الأولى هي اسم مميز يصف هدف هذا المعالج. الاسم الثاني هو تنفيذ المفوض الذي يعالج التغييرات.

فيما يلي مثال على مفوض:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

بعد ذلك، يمكنك تعريف اسم مثيل الحساب أو المعرف الفريد باستخدام WithInstanceName. يجب أن يكون اسم مثيل الحساب فريدا ومختلفا لكل مثيل حساب تقوم بنشره. يمكنك تعيين الحاوية للحفاظ على حالة التأجير باستخدام WithLeaseContainer.

يمنحك الاتصال Build مثيل المعالج الذي يمكنك البدء به عن طريق استدعاء StartAsync.

دورة حياة المعالجة

دورة الحياة العادية لمثيل المضيف هي:

  1. اقرأ موجز التغيير.
  2. إذا لم تكن هناك تغييرات، فانتقل إلى #1 لفترة زمنية محددة مسبقا (قابلة للتخصيص باستخدام WithPollInterval في Builder).
  3. إذا كانت هناك تغييرات، أرسلها إلى المفوض.
  4. عندما ينتهي المفوض من معالجة التغييرات بنجاح، قم بتحديث مخزن التأجير بأحدث نقطة معالجة في الوقت المناسب وانتقل إلى #1.

معالجة الخطأ

معالج موجز التغيير قابل للتعامل مع أخطاء كود المستخدم. إذا كان تنفيذ المفوض الخاص بك يحتوي على استثناء غير معالج (الخطوة #4)، فإن مؤشر الترابط الذي يعالج مجموعة معينة من التغييرات يتوقف، ويتم إنشاء مؤشر ترابط جديد في النهاية. يتحقق مؤشر الترابط الجديد من أحدث نقطة زمنية قام مخزن التأجير بحفظها لهذا النطاق من قيم مفتاح القسم. تتم إعادة تشغيل مؤشر الترابط الجديد من هناك، وإرسال نفس دفعة التغييرات إلى المفوض بشكل فعال. يستمر هذا السلوك حتى يعالج المفوض التغييرات بشكل صحيح، وهذا هو السبب في أن معالج موجز التغيير لديه ضمان "مرة واحدة على الأقل".

إشعار

في سيناريو واحد فقط، لا تتم إعادة محاولة دفعة من التغييرات. إذا حدث الفشل في تنفيذ المفوض الأول على الإطلاق، لا يحتوي مخزن التأجير على حالة حفظ سابقة لاستخدامها في إعادة المحاولة. في هذه الحالات، تستخدم إعادة المحاولة تكوين البدء الأولي، والذي قد يتضمن أو لا يتضمن الدفعة الأخيرة.

لمنع معالج موجز التغيير من "التعثر" باستمرار في إعادة محاولة دفعة التغييرات نفسها، يجب إضافة منطق في التعليمات البرمجية للمفوض لكتابة المستندات، عند الاستثناء، إلى قائمة انتظار رسالة خطأ. يضمن هذا التصميم أنه يمكنك تتبع التغييرات غير المعالجة بينما تظل قادراً على الاستمرار في معالجة التغييرات المستقبلية. قد تكون قائمة انتظار الرسائل الخطأ حاوية Azure Cosmos DB أخرى. مخزن البيانات الدقيق لا يهم. تريد ببساطة استمرار التغييرات غير المعالجة.

يمكنك أيضا استخدام أداة تقدير موجز التغيير لمراقبة تقدم مثيلات معالج موجز التغيير أثناء قراءتها لموجز التغيير، أو يمكنك استخدام إعلامات دورة الحياة للكشف عن حالات الفشل الأساسية.

إعلامات دورة الحياة

يمكنك توصيل معالج موجز التغيير بأي حدث ذي صلة في دورة حياته. يمكنك اختيار إعلامك بواحد أو كل منهم. التوصية هي تسجيل إشعار الخطأ على الأقل:

  • قم بتسجيل معالج WithLeaseAcquireNotification ليتم إخطاره عندما يحصل المضيف الحالي على عقد إيجار لبدء معالجته.
  • قم بتسجيل معالج WithLeaseReleaseNotification ليتم إخطاره عندما يصدر المضيف الحالي عقد إيجار ويتوقف عن معالجته.
  • تسجيل معالج WithErrorNotification ليتم إعلامه عندما يواجه المضيف الحالي استثناء أثناء المعالجة. يجب أن تكون قادرا على تمييز ما إذا كان المصدر هو مفوض المستخدم (استثناء غير معالج) أو خطأ يواجهه المعالج عندما يحاول الوصول إلى الحاوية المراقبة (على سبيل المثال، مشكلات الشبكة).
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

وحدة الانتشار

تتكون وحدة توزيع معالج موجز التغيير الواحدة من مثيل حساب واحد أو أكثر لها نفس القيمة لنفس processorName تكوين حاوية التأجير، ولكن أسماء مثيلات مختلفة. يمكن أن يكون لديك العديد من وحدات التوزيع التي تحتوي فيها كل وحدة على تدفق عمل مختلف للتغييرات وتتكون كل وحدة توزيع من مثيل واحد أو أكثر.

على سبيل المثال، قد يكون لديك وحدة توزيع واحدة تقوم بتشغيل واجهة برمجة تطبيقات خارجية في كل مرة يحدث فيها تغيير في الحاوية الخاصة بك. قد تنقل وحدة نشر أخرى البيانات في الوقت الحقيقي في كل مرة يحدث فيها تغيير. عند حدوث تغيير في الحاوية المراقبة، يتم إعلام جميع وحدات التوزيع الخاصة بك.

تحجيم ديناميكي

كما ذكرنا سابقا، داخل وحدة التوزيع، يمكن أن يكون لديك مثيل حساب واحد أو أكثر. للاستفادة من توزيع الحوسبة داخل وحدة التوزيع، المتطلبات الرئيسية الوحيدة هي:

  • يجب أن تحتوي جميع المثيلات على نفس تكوين حاوية الإيجار.
  • يجب أن يكون لكافة المثيلات نفس القيمة ل processorName.
  • يجب أن يكون لكل مثيل اسم مثيل مختلف (WithInstanceName).

إذا تم تطبيق هذه الشروط الثلاثة، فإن معالج موجز التغيير يوزع جميع عقود الإيجار الموجودة في حاوية التأجير عبر جميع مثيلات تشغيل وحدة التوزيع هذه، ويتوازي مع الحساب باستخدام خوارزمية توزيع متساو. التأجير مملوك لمثيل واحد في أي وقت، لذلك لا ينبغي أن يكون عدد المثيلات أكبر من عدد عقود الإيجار.

يمكن أن يزداد عدد المثيلات ويتقلص. يقوم معالج موجز التغيير بضبط الحمل ديناميكيا عن طريق إعادة توزيعه وفقا لذلك.

علاوة على ذلك، يمكن لمعالج موجز التغيير ضبط مقياس الحاوية ديناميكيا إذا زاد معدل نقل الحاوية أو تخزينها. عند نمو الحاوية الخاصة بك، يعالج معالج موجز التغيير السيناريو بشفافية عن طريق زيادة عقود الإيجار بشكل ديناميكي وتوزيع عقود الإيجار الجديدة بين المثيلات الموجودة.

وقت البدء

بشكل افتراضي، عندما يبدأ معالج موجز التغيير لأول مرة، فإنه يقوم بتهيئة حاوية التأجير ويبدأ دورة حياة المعالجة الخاصة به. لا يتم الكشف عن أي تغييرات حدثت في الحاوية المراقبة قبل تهيئة معالج موجز التغيير للمرة الأولى.

القراءة من تاريخ ووقت سابقين

من الممكن تهيئة معالج موجز التغيير لقراءة التغييرات بدءا من تاريخ ووقت محددين عن طريق تمرير مثيل DateTime إلى ملحق المنشئ WithStartTime :

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

تتم تهيئة معالج موجز التغيير لهذا التاريخ والوقت المحددين، ويبدأ في قراءة التغييرات التي حدثت بعد ذلك.

القراءة من البداية

في سيناريوهات أخرى، كما هو الحال في عمليات ترحيل البيانات أو إذا كنت تقوم بتحليل محفوظات الحاوية بأكملها، فأنت بحاجة إلى قراءة موجز التغيير من بداية عمر تلك الحاوية. يمكنك استخدام WithStartTime على ملحق المنشئ، ولكن تمرير DateTime.MinValue.ToUniversalTime()، الذي ينشئ تمثيل UTC للحد الأدنى DateTime للقيمة كما في هذا المثال:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

تتم تهيئة معالج موجز التغيير، ويبدأ في قراءة التغييرات من بداية عمر الحاوية.

إشعار

تعمل خيارات التخصيص هذه فقط لإعداد نقطة البداية في وقت معالج موجز التغيير. بعد تهيئة حاوية التأجير لأول مرة، لا يكون لتغيير هذه الخيارات أي تأثير.

موجز التغيير والإنتاجية المقدمة

تغيير عمليات قراءة الموجز على الحاوية المراقبة تستهلك وحدات الطلب. تأكد من أن الحاوية المراقبة لا تواجه التقييد. يضيف التقييد تأخيرات في تلقي أحداث موجز التغيير على المعالجات الخاصة بك.

تستهلك العمليات على حاوية الإيجار (تحديث وصيانة الحالة) وحدات الطلب . كلما ارتفع عدد المثيلات التي تستخدم نفس حاوية التأجير، ارتفع الاستهلاك المحتمل لوحدات الطلب. تأكد من أن حاوية التأجير الخاصة بك لا تواجه التقييد. يضيف التقييد تأخيرات في تلقي أحداث موجز التغيير. يمكن أن ينتهي التقييد تماما من المعالجة.

مشاركة حاوية التأجير

يمكنك مشاركة حاوية تأجير عبر وحدات توزيع متعددة. في حاوية تأجير مشتركة، تستمع كل وحدة توزيع إلى حاوية مراقبة مختلفة أو لها قيمة مختلفة ل processorName. في هذا التكوين، تحتفظ كل وحدة توزيع بحالة مستقلة على حاوية التأجير. راجع استهلاك وحدة الطلب على حاوية تأجير للتأكد من أن معدل النقل المقدم كاف لجميع وحدات التوزيع.

تكوين التأجير المتقدم

يمكن أن تؤثر ثلاثة تكوينات رئيسية على كيفية عمل معالج موجز التغيير. يؤثر كل تكوين على استهلاك وحدة الطلب على حاوية التأجير. يمكنك تعيين أحد هذه التكوينات عند إنشاء معالج موجز التغيير، ولكن استخدمها بعناية:

  • التأجير الحصول: بشكل افتراضي، كل 17 ثانية. يتحقق المضيف بشكل دوري من حالة مخزن التأجير ويفكر في الحصول على عقود الإيجار كجزء من عملية التحجيم الديناميكي. تتم هذه العملية عن طريق تنفيذ استعلام على حاوية التأجير. يؤدي تقليل هذه القيمة إلى جعل إعادة التوازن والحصول على عقود الإيجار أسرع، ولكنه يزيد من استهلاك وحدة الطلب على حاوية التأجير.
  • انتهاء صلاحية التأجير: بشكل افتراضي، 60 ثانية. يحدد الحد الأقصى لمقدار الوقت الذي يمكن أن يتواجد فيه عقد الإيجار دون أي نشاط تجديد قبل الحصول عليه من قبل مضيف آخر. عند تعطل مضيف، يتم التقاط عقود الإيجار التي يمتلكها من قبل مضيفين آخرين بعد هذه الفترة الزمنية بالإضافة إلى الفاصل الزمني للتجديد المكون. يؤدي تقليل هذه القيمة إلى إجراء الاسترداد بعد تعطل المضيف بشكل أسرع، ولكن يجب ألا تكون قيمة انتهاء الصلاحية أقل من الفاصل الزمني للتجديد.
  • تجديد التأجير: بشكل افتراضي، كل 13 ثانية. يقوم المضيف الذي يمتلك عقد إيجار بتجديد عقد الإيجار بشكل دوري، حتى إذا لم تكن هناك تغييرات جديدة للاستهلاك. تتم هذه العملية عن طريق تنفيذ استبدال في عقد الإيجار. يؤدي تقليل هذه القيمة إلى تقليل الوقت المطلوب للكشف عن عقود الإيجار المفقودة بسبب تعطل المضيف، ولكنه يزيد من استهلاك وحدة الطلب على حاوية التأجير.

أين تستضيف معالج موجز التغيير

يمكن استضافة معالج موجز التغيير في أي نظام أساسي يدعم العمليات أو المهام طويلة الأمد. إليك بعض الأمثلة:

على الرغم من أن معالج موجز التغيير يمكن تشغيله في بيئات قصيرة الأجل لأن حاوية التأجير تحافظ على الحالة، فإن دورة بدء التشغيل لهذه البيئات تضيف تأخيرات إلى الوقت الذي يستغرقه تلقي الإعلامات (بسبب الحمل الزائد لبدء تشغيل المعالج في كل مرة يتم فيها بدء تشغيل البيئة).

الموارد الإضافية

الخطوات التالية

تعرف على المزيد حول معالج موجز التغيير في المقالات التالية: