Gunakan estimator umpan perubahan

BERLAKU UNTUK: NoSQL

Artikel ini menjelaskan bagaimana Anda dapat memantau kemajuan instans prosesor umpan perubahan saat membaca umpan perubahan.

Mengapa memantau kemajuan penting?

Prosesor umpan perubahan bertindak sebagai penunjuk yang bergerak maju di seluruh umpan perubahan Anda dan memberikan perubahan pada implementasi delegasi.

Penyebaran prosesor umpan perubahan Anda dapat memproses perubahan pada tingkat tertentu berdasarkan sumber daya yang tersedia seperti CPU, memori, jaringan, dan sebagainya.

Jika laju ini lebih lambat dari tingkat di mana perubahan Anda terjadi di kontainer Azure Cosmos DB Anda, prosesor Anda akan mulai tertinggal.

Mengidentifikasi skenario ini membantu memahami apakah kita perlu menskalakan penyebaran prosesor umpan perubahan kami.

Mengimplementasikan estimator umpan perubahan

Sebagai model pendorongan untuk pemberitahuan otomatis

Seperti prosesor umpan perubahan, estimator umpan perubahan dapat berfungsi sebagai model pendorongan. Estimator akan mengukur perbedaan antara item yang diproses terakhir (didefinisikan oleh keadaan kontainer sewa) dan perubahan terbaru dalam kontainer, dan mendorong nilai ini ke delegasi. Interval di mana pengukuran diambil juga dapat disesuaikan dengan nilai default 5 detik.

Sebagai contoh, jika prosesor umpan perubahan Anda didefinisikan seperti ini:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Cara yang benar untuk menginisialisasi estimator untuk mengukur prosesor tersebut akan menggunakan GetChangeFeedEstimatorBuilder seperti:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Di mana prosesor dan estimator memiliki nama dan leaseContainer yang sama.

Dua parameter lainnya adalah delegasi, yang akan menerima angka yang menunjukkan berapa banyak perubahan yang tertunda untuk dibaca oleh prosesor, dan interval waktu di mana Anda ingin pengukuran ini diambil.

Contoh delegasi yang menerima estimasi adalah:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Anda dapat mengirim estimasi ini ke solusi pemantauan Anda dan menggunakannya untuk memahami bagaimana kemajuan Anda berperilaku dari waktu ke waktu.

Sebagai estimasi terperinci sesuai permintaan

Berbeda dengan model pendorongan, ada alternatif yang memungkinkan Anda mendapatkan estimasi sesuai permintaan. Model ini juga memberikan informasi yang lebih detail:

  • Perkiraan jeda per sewa.
  • Instans yang memiliki dan memproses setiap sewa, sehingga Anda dapat mengidentifikasi apakah ada masalah pada instans.

Jika prosesor umpan perubahan Anda didefinisikan seperti ini:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Anda dapat membuat estimator dengan konfigurasi sewa yang sama:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

Dan kapan pun Anda menginginkannya, dengan frekuensi yang Anda butuhkan, Anda dapat memperoleh estimasi terperinci:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Setiap ChangeFeedProcessorState akan berisi informasi sewa dan jeda, dan juga siapa instans saat ini yang memilikinya.

Penyebaran estimator

Estimator umpan perubahan tidak perlu digunakan sebagai bagian dari prosesor umpan perubahan Anda, atau menjadi bagian dari proyek yang sama. Sebaiknya sebarkan estimator pada instans independen dan sama sekali berbeda dari prosesor Anda. Satu instans estimator dapat melacak kemajuan untuk semua sewa dan instans dalam penyebaran prosesor umpan perubahan Anda.

Setiap estimasi akan menggunakan unit permintaan dari kontainer yang dipantau dan disewakan. Frekuensi 1 menit di antaranya adalah titik awal yang baik, semakin rendah frekuensi, semakin tinggi unit permintaan yang dikonsumsi.

Sumber Daya Tambahan:

Langkah berikutnya

Sekarang Anda bisa melanjutkan untuk mempelajari selengkapnya tentang prosesor umpan perubahan di artikel berikut ini: