موجز تغيير العملية في خدمة Azure Blob Storage

يوفّر موجز التغيير سجلات معاملات لجميع التغييرات التي تحدث للنقاط وبيانات التعريف للنقطة في حساب التخزين الخاص بك. توضح لك هذه المقالة كيفية قراءة سجلات موجز التغيير باستخدام مكتبة معالج موجز تغيير الكائن الثنائي كبير الحجم.

لمعرفة المزيد حول موجز التغيير، انظر موجز التغيير في Azure Blob Storage.

الحصول على مكتبة معالج موجز تغيير الكائن الثنائي كبير الحجم

  1. افتح نافذة أمر (على سبيل المثال: Windows PowerShell).
  2. من دليل المشروع، ثبت حزمة Azure.Storage.Blobs.Changefeed NuGet .
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

قِراءة السجلات

ملاحظة

موجز التغيير هو كيان غير قابل للتغيير، ومخصص للقراءة فقط في حساب التخزين الخاص بك. يمكن لأي عدد من التطبيقات قراءة ومعالجة موجز التغيير في وقت واحد وبشكل مستقل في الوقت المناسب بما يلائمها. لا تُزال السجلات من موجز التغيير عندما يقرؤها أحد التطبيقات. تكون حالة القراءة أو التكرار لكل قارئ مستهلك مستقلة وتُحفَظ من خلال تطبيقك فقط.

يتكرر هذا المثال بجميع السجلات في موجز التغييرات، ويضيفها إلى قائمة ثم يُعيد تلك القائمة إلى المتصل.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

يطبع ذلك المثال إلى وحدة التحكم بعض القيم من كل سجل في القائمة.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

استئناف قراءة السجلات من صفحة محفوظة

يمكنك اختيار حفظ آخر صفحة تمت قراءتها في موجز التغييرات، ثم استئناف التكرار من خلال السجلات في وقت لاحق. يمكنك حفظ صفحة القراءة عن طريق الحصول على مؤشر موجز التغيير. المؤشر عبارة عن سلسلة ويمكن للتطبيق الخاص بك حفظ هذه السلسلة بأي طريقة منطقية بالنسبة لتصميم التطبيق الخاص بك (على سبيل المثال: إلى ملف أو قاعدة بيانات).

يكرر هذا المثال من خلال جميع السجلات في موجز التغييرات، ويضيفها إلى قائمة، ويحفظ المؤشر. تُعاد القائمة والمؤشر إلى المتصل.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

دفق معالجة السجلات

يمكنك اختيار معالجة سجلات موجز التغيير كسجلات مثبته بموجز التغيير. راجع المواصفات. تُنشَر أحداث التغيير في موجز التغيير في فترة 60 ثانية في المتوسط. نوصي بإجراء استطلاع للتغييرات الجديدة مع وضع هذه الفترة في الاعتبار عِند تحديد الفاصل الزمني للاستطلاع.

يستقصي هذا المثال استطلاعات رأي دورية للتغييرات. في حال وجود سجلات التغيير، تعالج هذه التعليمة البرمجية هذه السجلات وتحفظ مؤشر موجز التغيير. وبهذه الطريقة إذا تم إيقاف العملية ثم بدء تشغيلها مرة أخرى، يُمكن للتطبيق استخدام المؤشر لاستئناف معالجة السجلات من حيث توقفت آخر مرة. يحفظ ذلك المثال المؤشر في ملف تكوين تطبيق محلي، ولكن يمكن للتطبيق حفظه بأي شكل من الأشكال الأكثر منطقية للسيناريو الخاص بك.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

قراءة سِجلات ضمن نطاق زمني

يمكنك قراءة السجلات التي تقع ضمن نطاق زمني مُحدد. يتكرر هذا المثال من خلال جميع السجلات في موجز التغيير الذي يقع في نطاق زمني بين الساعة 3:00 مساء في 2 مارس 2020 والساعة 2:00 صباحا في 7 أغسطس 2020، ويضيفها إلى قائمة، ثم يعيد تلك القائمة إلى المتصل.

اختيار شرائح لنطاق زمني

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

يتم تقريب وقت البدء الذي توفّره إلى أقرب ساعة ويتم تقريب الوقت إلى أقرب ساعة. من المحتمل أن يرى المستخدمون أحداثًا وقعت قبل وقت البدء وبعد وقت الانتهاء. من الممكن أيضًا ألا تظهر بعض الأحداث التي تحدث بين وقت البدء والانتهاء. وذلك لأنه قد يتم تسجيل الأحداث خلال الساعة السابقة لوقت البدء أو خلال الساعة التي تلي وقت الانتهاء.

الخطوات التالية