Bagikan melalui


Memproses umpan perubahan di Azure Blob Storage

Umpan perubahan menyediakan log transaksi dari semua perubahan yang terjadi pada blob dan metadata blob di akun penyimpanan Anda. Artikel ini memperlihatkan kepada Anda cara membaca rekaman umpan perubahan dengan pustaka prosesor umpan perubahan blob.

Untuk mempelajari selengkapnya tentang umpan perubahan, lihat Umpan perubahan di Azure Blob Storage.

Menyiapkan proyek Anda

Bagian ini memancang Anda menyiapkan proyek untuk bekerja dengan pustaka klien Blobs Change Feed untuk .NET.

Memasang paket

Dari direktori proyek Anda, instal paket untuk pustaka klien Umpan Perubahan Blob Azure Storage untuk .NET menggunakan dotnet add package perintah . Dalam contoh ini, kami menambahkan --prerelease bendera ke perintah untuk menginstal versi pratinjau terbaru.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Contoh kode dalam artikel ini juga menggunakan paket Azure Blob Storage dan Azure Identity .

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Tambahkan direktif using

Tambahkan arahan berikut using ke file kode Anda:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Membuat objek klien

Untuk menyambungkan aplikasi ke Blob Storage, buat instans BlobServiceClient kelas . Contoh berikut menunjukkan cara membuat objek klien menggunakan DefaultAzureCredential untuk otorisasi. Untuk mempelajari selengkapnya, lihat Mengotorisasi akses dan menyambungkan ke Blob Storage. Untuk bekerja dengan umpan perubahan, Anda memerlukan peran bawaan Azure RBAC Storage Blob Data Reader atau yang lebih tinggi.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

Objek klien diteruskan sebagai parameter ke beberapa metode yang ditunjukkan dalam artikel ini.

Membaca rekaman dalam umpan perubahan

Catatan

Umpan perubahan adalah entitas imutabel hanya-baca di akun penyimpanan Anda. Sejumlah aplikasi dapat membaca dan memproses umpan perubahan secara bersamaan dan independen kapan saja. Rekaman tidak dihapus dari umpan perubahan saat aplikasi membacanya. Status baca atau iterasi dari setiap pembaca yang mengonsumsi bersifat independen dan dikelola hanya oleh aplikasi Anda.

Contoh kode berikut mengulangi semua rekaman dalam umpan perubahan, menambahkannya ke daftar, lalu mengembalikan daftar peristiwa umpan perubahan:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Contoh kode berikut mencetak beberapa nilai dari daftar peristiwa umpan perubahan:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Lanjut membaca rekaman dari posisi tersimpan

Anda bisa memilih untuk menyimpan posisi baca Anda di umpan perubahan, lalu melanjutkan iterasi melalui rekaman di waktu mendatang. Anda bisa menyimpan posisi baca dengan mendapatkan kursor umpan perubahan. Kursor adalah string dan aplikasi Anda dapat menyimpan string tersebut dengan cara apa pun yang masuk akal untuk desain aplikasi Anda, misalnya, ke file atau database.

Contoh ini beriterasi melalui semua rekaman di umpan perubahan, menambahkannya ke daftar, dan menyimpan kursor. Daftar dan kursor dikembalikan ke pemanggil.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Pemrosesan stream rekaman

Anda dapat memilih untuk memproses rekaman umpan perubahan saat mereka berkomitmen pada umpan perubahan. Lihat Spesifikasi. Peristiwa perubahan diterbitkan ke umpan perubahan pada periode rata-rata 60 detik. Kami menyarankan agar Anda melakukan polling perubahan baru periode ini saat menentukan interval polling Anda.

Contoh ini secara berkala melakukan polling untuk perubahan. Jika ada perubahan rekaman, kode ini memproses rekaman tersebut dan menyimpan kursor umpan perubahan. Dengan begitu jika proses dihentikan lalu dimulai lagi, aplikasi dapat menggunakan kursor untuk melanjutkan pemrosesan rekaman tempatnya terakhir kali ditinggal. Contoh ini menyimpan kursor ke file lokal untuk tujuan demonstrasi, tetapi aplikasi Anda dapat menyimpannya dalam bentuk apa pun yang paling masuk akal untuk skenario Anda.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Membaca rekaman dalam rentang waktu tertentu

Anda dapat membaca rekaman yang berada di rentang waktu tertentu. Contoh ini berulang melalui semua rekaman dalam umpan perubahan yang termasuk dalam rentang tanggal dan waktu tertentu, menambahkannya ke daftar, dan mengembalikan daftar:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Waktu mulai yang Anda berikan dibulatkan ke bawah ke jam terdekat dan waktu akhir dibulatkan hingga jam terdekat. Ada kemungkinan bahwa pengguna mungkin melihat kejadian sebelum waktu mulai dan setelah waktu akhir. Ada kemungkinan juga bahwa beberapa kejadian yang di antara waktu mulai dan berakhir tidak akan muncul. Itu karena kejadian mungkin direkam selama jam sebelumnya ke waktu mulai atau selama jam setelah waktu akhir.

Langkah berikutnya