تغيير نموذج سحب التغذية في Azure Cosmos DB

ينطبق على: NoSQL

يمكنك استخدام نموذج سحب موجز التغيير لاستهلاك موجز تغيير Azure Cosmos DB بوتيرتك الخاصة. على غرار معالج موجز التغيير، يمكنك استخدام نموذج سحب موجز التغيير لموازاة معالجة التغييرات عبر العديد من مستهلكي موجز التغيير.

مقارنة بمعالج موجز التغيير

يمكن للعديد من السيناريوهات معالجة موجز التغيير باستخدام معالج موجز التغيير أو نموذج سحب موجز التغيير. تعمل الرموز المميزة للاستمرار لنموذج السحب وحاوية تأجير معالج موجز التغيير كعلامات مرجعية لآخر عنصر تمت معالجته أو دفعة من العناصر في موجز التغيير.

ومع ذلك، لا يمكنك تحويل رموز المتابعة المميزة إلى عقد تأجير أو العكس.

إشعار

في معظم الحالات، عندما تحتاج إلى القراءة من موجز التغيير، فإن أبسط خيار هو استخدام معالج موجز التغيير.

يجب أن تفكر في استخدام نموذج السحب في هذه السيناريوهات:

  • لقراءة التغييرات من مفتاح قسم معين.
  • للتحكم في سرعة تلقي العميل للتغييرات للمعالجة.
  • لإجراء قراءة لمرة واحدة للبيانات الموجودة في موجز التغيير (على سبيل المثال، لإجراء ترحيل البيانات).

فيما يلي بعض الاختلافات الرئيسية بين معالج موجز التغيير ونموذج سحب موجز التغيير:

ميزة تغيير معالج التغذية تغيير نموذج سحب الموجز
تتبع النقطة الحالية في معالجة موجز التغيير الإيجار (مخزناً في حاوية Azure Cosmos DB) رمز الاستمرار (المخزن في الذاكرة، أو المستمر يدويًّا)
القدرة على إعادة التغييرات السابقة نعم، مع نموذج الدفع نعم، مع نموذج السحب
الاقتراع من أجل التغييرات المستقبلية التحقق تلقائيا من التغييرات استنادا إلى القيمة المحددة من قبل WithPollInterval المستخدم يدوي
السلوك حيث لا توجد تغييرات جديدة انتظر القيمة WithPollInterval تلقائيا ثم أعد التحقق يجب التحقق من الحالة وإعادة الفحص يدويًّا
معالجة التغييرات من حاوية بأكملها نعم، ومتوازية تلقائيا عبر مؤشرات ترابط وأجهزة متعددة تستهلك من نفس الحاوية نعم، ومتوازية يدويا باستخدام FeedRange
معالجة التغييرات من مفتاح قسم واحد فقط غير مدعوم ‏‏نعم‬

إشعار

عند استخدام نموذج السحب، على عكس القراءة باستخدام معالج موجز التغيير، يجب معالجة الحالات التي لا توجد فيها تغييرات جديدة بشكل صريح.

العمل مع نموذج السحب

لمعالجة موجز التغيير باستخدام نموذج السحب، قم بإنشاء مثيل ل FeedIterator. عند إنشاء FeedIterator، يجب تحديد قيمة مطلوبة ChangeFeedStartFrom ، والتي تتكون من موضع البداية لقراءة التغييرات والقيمة التي تريد استخدامها ل FeedRange. FeedRange هو نطاق من قيم مفتاح القسم ويحدد العناصر التي يمكن قراءتها من موجز التغيير باستخدام ذلك المحدد FeedIterator. يجب أيضا تحديد قيمة مطلوبة ChangeFeedMode للوضع الذي تريد معالجة التغييرات فيه: أحدث إصدار أو جميع الإصدارات والحذف. استخدم إما ChangeFeedMode.LatestVersion أو ChangeFeedMode.AllVersionsAndDeletes للإشارة إلى الوضع الذي تريد استخدامه لقراءة موجز التغيير. عند استخدام جميع الإصدارات ووضع الحذف، يجب تحديد بدء موجز التغيير من قيمة إما Now() أو من رمز متابعة مميز معين.

يمكنك اختياريًّا تحديد ChangeFeedRequestOptionsto set aPageSizeHint. عند التعيين، تحدِّد هذه الخاصية الحد الأقصى لعدد العناصر المستلمة لكل صفحة. إذا تم تنفيذ العمليات في المجموعة المراقبة من خلال الإجراءات المخزنة، يتم الاحتفاظ بنطاق المعاملة عند قراءة العناصر من موجز التغيير. ونتيجة لذلك، قد يكون عدد العناصر المستلمة أعلى من القيمة المحددة بحيث يتم إرجاع العناصر التي تم تغييرها بواسطة نفس المعاملة كجزء من دفعة ذرية واحدة.

فيما يلي مثال على كيفية الحصول على FeedIterator أحدث وضع إصدار يقوم بإرجاع كائنات الكيان، وفي هذه الحالة عنصر User :

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

تلميح

قبل الإصدار 3.34.0، يمكن استخدام أحدث وضع إصدار عن طريق إعداد ChangeFeedMode.Incremental. سيشاهد كل من Incremental و LatestVersion إلى أحدث وضع إصدار من موجز التغيير والتطبيقات التي تستخدم أي من الوضعين السلوك نفسه.

جميع الإصدارات ووضع الحذف قيد المعاينة ويمكن استخدامها مع إصدارات >معاينة .NET SDK = 3.32.0-preview. فيما يلي مثال للحصول على FeedIterator في جميع الإصدارات ووضع الحذف الذي يرجع الكائنات الديناميكية:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

إشعار

في وضع الإصدار الأخير، تتلقى الكائنات التي تمثل العنصر الذي تم تغييره، مع بعض بيانات التعريف الإضافية. ترجع جميع الإصدارات ووضع الحذف نموذج بيانات مختلف. لمزيد من المعلومات، راجع تحليل كائن الاستجابة.

استهلاك موجز التغيير عبر التدفقات

FeedIterator لكل من وضعي موجز التغيير خياران. بالإضافة إلى الأمثلة التي ترجع كائنات الكيان، يمكنك أيضا الحصول على الاستجابة بدعم Stream . تسمح لك التدفقات بقراءة البيانات دون إلغاء تسلسلها أولا، لذلك يمكنك الحفظ على موارد العميل.

فيما يلي مثال على كيفية الحصول على FeedIterator في وضع الإصدار الأخير الذي يرجع Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

استهلاك التغييرات لحاوية بأكملها

إذا لم توفر معلمة FeedRange إلى FeedIterator، يمكنك معالجة موجز تغيير الحاوية بأكملها بالسرعة الخاصة بك. فيما يلي مثال، يبدأ بقراءة جميع التغييرات، بدءا من الوقت الحالي باستخدام أحدث وضع للإصدار:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

لأن موجز التغيير هو بشكل فعال قائمة لا نهائية من العناصر التي تشمل جميع عمليات الكتابة والتحديثات المستقبلية، فإن قيمة HasMoreResults دائما trueهي . عندما تحاول قراءة موجز التغيير ولا تتوفر أي تغييرات جديدة، تتلقى استجابة بالحالة NotModified . في المثال السابق، تتم معالجتها بالانتظار خمس ثوان قبل إعادة التحقق من التغييرات.

استهلاك التغييرات لمفتاح القسم

في بعض الحالات، قد تحتاج إلى معالجة التغييرات لمفتاح قسم معين فقط. يمكنك الحصول على مفتاح قسم معين ومعالجة التغييرات بنفس الطريقة التي يمكنك بها FeedIterator لحاوية بأكملها.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

استخدام FeedRange للتوازي

في تغيير معالج الخلاصة، ينتشر العمل تلقائيًّا عبر كثير من المستهلكين. في نموذج سحب التغذية المتغيرة، يمكنك استخدام FeedRange لموازاة معالجة تغذية التغيير. يمثل FeedRange نطاقاً من قيم مفتاح القسم.

فيما يلي مثال يوضح كيفية الحصول على قائمة النطاقات للحاوية الخاصة بك:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

عندما تحصل على قائمة FeedRange بالقيم للحاوية الخاصة بك، تحصل على واحدة FeedRange لكل قسم فعلي.

باستخدام FeedRange، يمكنك إنشاء FeedIterator لتوازي معالجة موجز التغيير عبر أجهزة أو مؤشرات ترابط متعددة. على عكس المثال السابق الذي أوضح كيفية الحصول على FeedIterator للحاوية كلها أو مفتاح قسم واحد، يمكنك استخدام FeedRanges للحصول على FeedIterators متعددة، والتي يمكنها معالجة موجز التغيير بالتوازي.

في حال رغبتك في استخدام FeedRanges، فأنت بحاجة إلى عملية منسق تحصل على FeedRanges وتوزعها على تلك الأجهزة. قد يكون هذا التوزيع:

  • استخدام FeedRange.ToJsonString وتوزيع قيمة السلسلة هذه. يمكن للمستهلكين استخدام هذه القيمة مع FeedRange.FromJsonString.
  • إذا كان التوزيع قيد التشغيل، فقم بتمرير مرجع الكائن FeedRange .

إليك عينة توضح كيفية القراءة من بداية موجز تغيير الحاوية باستخدام جهازين منفصلين افتراضيين يقرأان بالتوازي:

الجهاز 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

الجهاز 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

حفظ الرموز المميزة للمتابعة

يمكنك حفظ موضع FeedIterator الخاص بك بالحصول على رمز المتابعة. رمز الاستمرارية هو قيمة سلسلة تتعقب آخر تغييرات تمت معالجتها لـ FeedIterator وتسمح لـ FeedIterator بالاستئناف في هذه المرحلة لاحقاً. الرمز المميز للمتابعة، إذا تم تحديده، له الأسبقية على وقت البدء ويبدأ من قيم البداية. تقرأ التعليمات البرمجية التالية من خلال موجز التغيير منذ إنشاء الحاوية. بعد عدم توافر مزيد من التغييرات، سيستمر الرمز المميز للاستمرار بحيث يمكن استئناف تغيير استهلاك العلف لاحقاً.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

عند استخدام أحدث وضع إصدار، لا تنتهي صلاحية رمز المتابعة FeedIterator أبدا طالما أن حاوية Azure Cosmos DB لا تزال موجودة. عند استخدام جميع الإصدارات ووضع الحذف، يكون رمز المتابعة FeedIterator صالحا طالما حدثت التغييرات داخل نافذة الاستبقاء للنسخ الاحتياطية المستمرة.

الخطوات التالية