Model penarikan umpan perubahan di Azure Cosmos DB

BERLAKU UNTUK: NoSQL

Anda dapat menggunakan model penarikan umpan perubahan untuk menggunakan umpan perubahan Azure Cosmos DB dengan kecepatan Anda sendiri. Mirip dengan prosesor umpan perubahan, Anda dapat menggunakan model penarikan umpan perubahan untuk menyejajarkan pemrosesan perubahan di beberapa konsumen umpan perubahan.

Bandingkan dengan prosesor umpan perubahan

Banyak skenario dapat memproses umpan perubahan dengan menggunakan prosesor umpan perubahan atau model penarikan umpan perubahan. Token kelanjutan model penarikan dan kontainer sewa prosesor umpan perubahan berfungsi sebagai marka buku untuk item terakhir yang diproses atau batch item di umpan perubahan.

Namun, Anda tidak dapat mengonversi token kelanjutan menjadi sewa atau sebaliknya.

Catatan

Dalam kebanyakan kasus, ketika Anda perlu membaca dari umpan perubahan, opsi paling sederhana adalah menggunakan prosesor umpan perubahan.

Anda harus pertimbangkan model penarikan dalam skenario ini:

  • Untuk membaca perubahan dari kunci partisi tertentu.
  • Untuk mengontrol kecepatan di mana klien Anda menerima perubahan untuk diproses.
  • Untuk melakukan pembacaan satu kali data yang ada di umpan perubahan (misalnya, untuk melakukan migrasi data).

Berikut adalah beberapa perbedaan utama antara prosesor umpan perubahan dan model penarikan umpan perubahan:

Fitur Prosesor umpan perubahan Model penarikan umpan perubahan
Melacak titik saat ini dalam memproses umpan perubahan Sewa (disimpan dalam kontainer Azure Cosmos DB) Token kelanjutan (disimpan dalam memori atau ditahan secara manual)
Kemampuan untuk memutar ulang perubahan sebelumnya Ya, dengan model pendorongan Ya, dengan model penarikan
Polling untuk perubahan di masa mendatang Memeriksa perubahan secara otomatis berdasarkan nilai yang ditentukan WithPollInterval pengguna Manual
Perilaku tanpa perubahan baru Tunggu nilai secara otomatis lalu WithPollInterval centang ulang Harus memeriksa status dan memeriksa ulang secara manual
Memproses perubahan dari seluruh kontainer Ya, dan secara otomatis diparalelkan di beberapa utas dan mesin yang mengonsumsi dari kontainer yang sama Ya, dan diparalelkan secara manual dengan menggunakan FeedRange
Memproses perubahan hanya dari satu kunci partisi Tidak didukung Ya

Catatan

Saat Anda menggunakan model penarikan, tidak seperti saat membaca dengan menggunakan prosesor umpan perubahan, Anda harus secara eksplisit menangani kasus di mana tidak ada perubahan baru.

Bekerja dengan model penarikan

Untuk memproses umpan perubahan dengan menggunakan model penarikan, buat instans FeedIterator. Ketika Anda awalnya membuat FeedIterator, Anda harus menentukan nilai yang diperlukan ChangeFeedStartFrom , yang terdiri dari posisi awal untuk membaca perubahan dan nilai yang ingin Anda gunakan untuk FeedRange. FeedRange adalah rentang nilai kunci partisi dan menentukan item yang dapat dibaca dari umpan perubahan dengan menggunakan spesifik FeedIteratortersebut. Anda juga harus menentukan nilai yang diperlukan ChangeFeedMode untuk mode tempat Anda ingin memproses perubahan: versi terbaru atau semua versi dan penghapusan. Gunakan atau ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes untuk menunjukkan mode mana yang ingin Anda gunakan untuk membaca umpan perubahan. Saat Anda menggunakan semua versi dan mode penghapusan, Anda harus memilih umpan perubahan mulai dari nilai dari Now() atau dari token kelanjutan tertentu.

Anda dapat secara opsional menentukan ChangeFeedRequestOptions untuk mengatur PageSizeHint. Saat diatur, properti ini menetapkan jumlah maksimum item yang diterima per halaman. Jika operasi dalam koleksi yang dipantau dilakukan melalui prosedur tersimpan, cakupan transaksi dipertahankan saat membaca item dari umpan perubahan. Akibatnya, jumlah item yang diterima mungkin lebih tinggi dari nilai yang ditentukan sehingga item yang diubah oleh transaksi yang sama dikembalikan sebagai bagian dari satu batch atomik.

Berikut adalah contoh cara mendapatkan FeedIterator dalam mode versi terbaru yang mengembalikan objek entitas, dalam hal User ini objek:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tip

Sebelum versi 3.34.0, mode versi terbaru dapat digunakan dengan mengatur ChangeFeedMode.Incremental. Baik Incremental dan LatestVersion lihat mode versi terbaru dari umpan perubahan dan aplikasi yang menggunakan salah satu mode akan melihat perilaku yang sama.

Semua versi dan mode penghapusan dalam pratinjau dan dapat digunakan dengan pratinjau versi >.NET SDK = 3.32.0-preview. Berikut adalah contoh untuk mendapatkan FeedIterator di semua versi dan menghapus mode yang mengembalikan objek dinamis:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Catatan

Dalam mode versi terbaru, Anda menerima objek yang mewakili item yang berubah, dengan beberapa metadata tambahan. Semua versi dan mode penghapusan mengembalikan model data yang berbeda. Untuk informasi selengkapnya, lihat Mengurai objek respons.

Mengonsumsi umpan perubahan melalui aliran

FeedIterator untuk kedua mode umpan perubahan memiliki dua opsi. Selain contoh yang mengembalikan objek entitas, Anda juga dapat memperoleh respons dengan Stream dukungan. Aliran memungkinkan Anda membaca data tanpa terlebih dahulu deserialisasi, sehingga Anda menghemat sumber daya klien.

Berikut adalah contoh cara mendapatkan FeedIterator dalam mode versi terbaru yang mengembalikan Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Mengonsumsi perubahan untuk seluruh kontainer

Jika Anda tidak menyediakan FeedRange parameter ke FeedIterator, Anda dapat memproses seluruh umpan perubahan kontainer dengan kecepatan Anda sendiri. Berikut adalah contoh, yang mulai membaca semua perubahan, dimulai pada waktu saat ini dengan menggunakan mode versi terbaru:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Karena umpan perubahan secara efektif merupakan daftar item tak terbatas yang mencakup semua penulisan dan pembaruan di masa mendatang, nilai HasMoreResults selalu true. Ketika Anda mencoba membaca umpan perubahan dan tidak ada perubahan baru yang tersedia, Anda menerima respons dengan NotModified status. Dalam contoh sebelumnya, ini ditangani dengan menunggu lima detik sebelum memeriksa ulang perubahan.

Menggunakan perubahan untuk kunci partisi

Dalam beberapa kasus, Anda mungkin hanya ingin memproses perubahan untuk kunci partisi tertentu. Anda dapat memperoleh FeedIterator untuk kunci partisi tertentu dan memproses perubahan dengan cara yang sama seperti yang Anda dapat untuk seluruh kontainer.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Menggunakan FeedRange untuk paralelisasi

Dalam prosesor umpan perubahan, pekerjaan tersebar di beberapa konsumen secara otomatis. Dalam model penarikan umpan perubahan, Anda dapat menggunakan FeedRange untuk paralelisasi pemrosesan umpan perubahan. FeedRange mewakili rentang nilai kunci partisi.

Berikut adalah contoh yang menunjukkan cara mendapatkan daftar rentang untuk kontainer Anda:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Saat Anda mendapatkan daftar FeedRange nilai untuk kontainer, Anda mendapatkan satu FeedRange per partisi fisik.

Dengan menggunakan FeedRange, Anda dapat membuat FeedIterator untuk menyejajarkan pemrosesan umpan perubahan di beberapa komputer atau utas. Berbeda dengan contoh sebelumnya yang menunjukkan cara mendapatkan FeedIterator untuk seluruh kontainer atau satu kunci partisi, Anda dapat menggunakan FeedRanges untuk mendapatkan beberapa FeedIterator yang dapat memproses umpan perubahan secara paralel.

Saat ingin menggunakan FeedRanges, Anda harus memiliki proses orkestrator yang mendapatkan FeedRanges dan mendistribusikannya ke mesin tersebut. Distribusi ini mungkin:

  • Menggunakan FeedRange.ToJsonString dan mendistribusikan nilai untai (karakter) ini. Konsumen dapat menggunakan nilai ini dengan FeedRange.FromJsonString.
  • Jika distribusi sedang berproses, lewati referensi objek FeedRange.

Berikut adalah sampel yang menunjukkan cara membaca dari awal umpan perubahan kontainer dengan menggunakan dua komputer terpisah hipotetis yang dibaca secara paralel:

Mesin 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Mesin 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Menyimpan token kelanjutan

Anda dapat menyimpan posisi FeedIterator dengan mendapatkan token kelanjutan. Token kelanjutan adalah nilai untai (karakter) yang melacak perubahan terakhir yang diproses FeedIterator Anda dan mengizinkan FeedIterator untuk melanjutkan pada titik ini nantinya. Token kelanjutan, jika ditentukan, lebih diutamakan dari waktu mulai dan mulai dari nilai awal. Kode berikut membaca umpan perubahan sejak pembuatan kontainer. Setelah tidak ada lagi perubahan yang tersedia, kode tersebut akan menahan token kelanjutan sehingga konsumsi umpan perubahan dapat dilanjutkan nanti.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Saat Anda menggunakan mode versi terbaru, token kelanjutan FeedIterator tidak pernah kedaluwarsa selama kontainer Azure Cosmos DB masih ada. Saat Anda menggunakan semua versi dan mode penghapusan, FeedIterator token kelanjutan berlaku selama perubahan terjadi dalam jendela retensi untuk pencadangan berkelanjutan.

Langkah berikutnya