Bagikan melalui


Mengubah prosesor umpan di Azure Cosmos DB

BERLAKU UNTUK: NoSQL

Pemroses umpan perubahan adalah bagian dari Azure Cosmos DB .NET V3 dan SDK Java V4. Hal ini menyederhanakan proses membaca feed perubahan dan mendistribusikan pemrosesan peristiwa di beberapa konsumen secara efektif.

Manfaat utama menggunakan prosesor umpan perubahan adalah desainnya yang toleran terhadap kesalahan, yang memastikan pengiriman "setidaknya sekali" dari semua peristiwa dalam umpan perubahan.

SDK yang didukung

.Net V3 Java Node.JS Python

Komponen prosesor umpan perubahan

Prosesor umpan perubahan memiliki empat komponen utama:

  • Kontainer terpantau: Kontainer terpantau memiliki data dari asal umpan perubahan dihasilkan. Setiap sisipan dan pembaruan pada kontainer yang dipantau tercermin dalam umpan perubahan kontainer.

  • Kontainer sewa: Kontainer sewa bertindak sebagai penyimpanan status dan mengoordinasikan pemrosesan umpan perubahan di beberapa pekerja. Kontainer sewa dapat disimpan dalam akun yang sama dengan kontainer yang dipantau atau di akun terpisah.

  • Instans komputasi: Instans komputasi yang menghosting prosesor umpan perubahan untuk mendengarkan perubahan. Tergantung pada platform, itu mungkin diwakili oleh komputer virtual (VM), pod Kubernetes, instans Azure App Service, atau komputer fisik aktual. Instans komputasi memiliki pengidentifikasi unik yang disebut nama instans di seluruh artikel ini.

  • Delegasi: Delegasi adalah kode yang mendefinisikan apa yang Anda, pengembang, ingin lakukan dengan setiap batch perubahan yang dibaca prosesor umpan perubahan.

Untuk lebih memahami bagaimana keempat elemen prosesor umpan perubahan ini bekerja sama, mari kita lihat contoh dalam diagram berikut. Kontainer yang dipantau menyimpan item dan menggunakan 'Kota' sebagai kunci partisi. Nilai kunci partisi didistribusikan dalam rentang (setiap rentang mewakili partisi fisik) yang berisi item.

Diagram menunjukkan dua instans komputasi, dan prosesor umpan perubahan menetapkan rentang yang berbeda untuk setiap instans untuk memaksimalkan distribusi komputasi. Setiap instans memiliki nama yang berbeda dan unik.

Setiap rentang dibaca secara paralel. Kemajuan rentang dipertahankan secara terpisah dari rentang lain dalam kontainer sewa melalui dokumen sewa . Kombinasi sewa mewakili status saat ini dari prosesor umpan perubahan.

Ubah contoh prosesor umpan

Menerapkan pemroses umpan perubahan

Prosesor umpan perubahan di .NET tersedia untuk mode versi terbaru dan semua versi dan mode penghapusan. Semua versi dan mode penghapusan dalam pratinjau dan didukung untuk prosesor umpan perubahan yang dimulai dalam versi 3.40.0-preview.0. Titik entri untuk kedua mode selalu merupakan kontainer yang dipantau.

Untuk membaca menggunakan mode versi terbaru, dalam Container instans, Anda memanggil GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Untuk membaca menggunakan semua versi dan mode penghapusan, panggil GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes dari Container instans:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Untuk kedua mode, parameter pertama adalah nama berbeda yang menjelaskan tujuan prosesor ini. Nama kedua adalah implementasi delegasi yang menangani perubahan.

Berikut adalah contoh delegasi untuk mode versi terbaru:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Berikut adalah contoh delegasi untuk semua versi dan mode penghapusan:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Setelah itu, Anda menentukan nama instans komputasi atau pengidentifikasi unik dengan menggunakan WithInstanceName. Nama instans komputasi harus unik dan berbeda untuk setiap instans komputasi yang Anda sebarkan. Anda mengatur kontainer untuk mempertahankan status sewa dengan menggunakan WithLeaseContainer.

Memanggil Build memberi Anda instans prosesor yang dapat Anda mulai dengan memanggil StartAsync.

Catatan

Cuplikan kode sebelumnya diambil dari sampel di GitHub. Anda bisa mendapatkan sampel untuk mode versi terbaru atau semua versi dan mode penghapusan.

Memproses siklus hidup

Siklus hidup normal instans host adalah:

  1. Membaca umpan perubahan.
  2. Jika tidak ada perubahan, tidurlah untuk jumlah waktu yang telah ditentukan sebelumnya (dapat disesuaikan dengan menggunakan WithPollInterval di Builder) dan buka #1.
  3. Jika ada perubahan, instans mengirimkannya ke delegasi.
  4. Saat delegasi selesai memproses perubahan dengan sukses, perbarui toko sewa dengan titik waktu yang diproses terbaru dan buka #1.

Penanganan kesalahan

Prosesor umpan perubahan tahan terhadap kesalahan kode pengguna. Jika implementasi delegasi Anda memiliki pengecualian yang tidak tertangani (langkah #4), utas yang memproses batch perubahan tertentu berhenti, dan utas baru akhirnya dibuat. Utas baru memeriksa titik waktu terbaru yang telah disimpan penyimpanan sewa untuk rentang nilai kunci partisi tersebut. Utas baru dimulai ulang dari sana, secara efektif mengirim batch perubahan yang sama ke delegasi. Perilaku ini berlanjut sampai delegasi Anda memproses perubahan dengan benar, dan itu adalah alasan prosesor umpan perubahan memiliki jaminan "setidaknya sekali".

Catatan

Hanya dalam satu skenario, batch perubahan tidak dicoba kembali. Jika kegagalan terjadi pada eksekusi delegasi pertama kali, penyimpanan sewa tidak memiliki status tersimpan sebelumnya yang akan digunakan pada coba lagi. Dalam kasus tersebut, coba lagi menggunakan konfigurasi awal, yang mungkin atau mungkin tidak menyertakan batch terakhir.

Untuk mencegah prosesor umpan perubahan Anda "macet" terus mencoba kembali batch perubahan yang sama, Anda harus menambahkan logika dalam kode delegasi Anda untuk menulis dokumen, setelah pengecualian, ke antrean pesan yang salah. Desain ini memastikan Anda dapat melacak perubahan yang tidak diproses sekaligus tetap dapat terus memproses perubahan di masa mendatang. Antrean pesan kesalahan mungkin berupa kontainer Azure Cosmos DB lainnya. Penyimpanan data yang tepat tidak masalah. Anda hanya ingin perubahan yang tidak diolah dipertahankan.

Anda juga dapat menggunakan penaksir umpan perubahan untuk memantau kemajuan instans prosesor umpan perubahan saat mereka membaca umpan perubahan, atau Anda dapat menggunakan pemberitahuan siklus hidup untuk mendeteksi kegagalan yang mendasarinya.

Pemberitahuan siklus hidup

Anda dapat menghubungkan prosesor umpan perubahan ke peristiwa yang relevan dalam siklus hidupnya. Anda dapat memilih untuk diberi tahu kepada satu atau semuanya. Rekomendasinya adalah setidaknya mendaftarkan pemberitahuan kesalahan:

  • Daftarkan penghandel untuk WithLeaseAcquireNotification diberi tahu saat host saat ini memperoleh sewa untuk mulai memprosesnya.
  • Daftarkan penghandel untuk WithLeaseReleaseNotification diberi tahu saat host saat ini melepaskan sewa dan berhenti memprosesnya.
  • Daftarkan handler untuk WithErrorNotification diberi tahu ketika host saat ini mengalami pengecualian selama pemrosesan. Anda harus dapat membedakan apakah sumbernya adalah delegasi pengguna (pengecualian yang tidak tertangani) atau kesalahan yang dihadapi prosesor ketika mencoba mengakses kontainer yang dipantau (misalnya, masalah jaringan).

Pemberitahuan siklus hidup tersedia di kedua mode umpan perubahan. Berikut adalah contoh pemberitahuan siklus hidup dalam mode versi terbaru:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Unit penyebaran

Unit penyebaran prosesor umpan perubahan tunggal terdiri dari satu atau beberapa instans komputasi yang memiliki nilai yang sama untuk processorName dan konfigurasi kontainer sewa yang sama, tetapi nama instans yang berbeda. Anda dapat memiliki banyak unit penyebaran di mana setiap unit memiliki alur bisnis yang berbeda untuk perubahan dan setiap unit penyebaran terdiri dari satu atau beberapa instans.

Misalnya, Anda mungkin memiliki satu unit penyebaran yang memicu API eksternal setiap kali ada perubahan dalam kontainer Anda. Unit penyebaran lain mungkin memindahkan data secara real time setiap kali ada perubahan. Saat perubahan terjadi di kontainer yang dipantau, semua unit penyebaran Anda akan diberi tahu.

Penskalaan dinamis

Seperti disebutkan sebelumnya, dalam unit penyebaran, Anda dapat memiliki satu atau beberapa instans komputasi. Untuk memanfaatkan distribusi komputasi dalam unit penyebaran, satu-satunya persyaratan utama adalah:

  • Semua instans harus memiliki konfigurasi kontainer sewa yang sama.
  • Semua instans harus memiliki nilai yang sama untuk processorName.
  • Setiap instans harus memiliki nama instans yang berbeda (WithInstanceName).

Jika ketiga kondisi ini berlaku, maka prosesor umpan perubahan mendistribusikan semua sewa yang ada dalam kontainer sewa di semua instans yang berjalan dari unit penyebaran tersebut, dan menyejajarkan komputasi dengan menggunakan algoritma distribusi yang sama. Sewa dimiliki oleh satu instans kapan saja, sehingga jumlah instans tidak boleh lebih besar dari jumlah sewa.

Jumlah instans dapat bertambah dan menyusut. Prosesor umpan perubahan secara dinamis menyesuaikan beban dengan mendistribusikannya kembali.

Selain itu, prosesor umpan perubahan dapat secara dinamis menyesuaikan skala kontainer jika throughput atau penyimpanan kontainer meningkat. Ketika kontainer Anda tumbuh, prosesor umpan perubahan secara transparan menangani skenario dengan meningkatkan sewa secara dinamis dan mendistribusikan sewa baru di antara instans yang ada.

Waktu memulai

Secara default, ketika prosesor umpan perubahan dimulai untuk pertama kalinya, prosesor ini menginisialisasi kontainer sewa dan memulai siklus hidup pemrosesannya. Setiap perubahan yang terjadi dalam kontainer yang dipantau sebelum prosesor umpan perubahan diinisialisasi untuk pertama kalinya tidak terdeteksi.

Membaca dari tanggal dan waktu sebelumnya

Dimungkinkan untuk menginisialisasi prosesor umpan perubahan untuk membaca perubahan yang dimulai pada tanggal dan waktu tertentu dengan meneruskan instans DateTime ke WithStartTime ekstensi penyusun:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Prosesor umpan perubahan diinisialisasi untuk tanggal dan waktu tertentu, dan mulai membaca perubahan yang terjadi setelahnya.

Membaca dari awal

Dalam skenario lain, seperti dalam migrasi data atau jika Anda menganalisis seluruh riwayat kontainer, Anda perlu membaca umpan perubahan dari awal masa pakai kontainer tersebut. Anda dapat menggunakan WithStartTime pada ekstensi penyusun, tetapi meneruskan DateTime.MinValue.ToUniversalTime(), yang menghasilkan representasi UTC dari nilai minimum DateTime seperti dalam contoh ini:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Prosesor umpan perubahan diinisialisasi, dan mulai membaca perubahan dari awal masa pakai kontainer.

Catatan

Opsi kustomisasi ini hanya berfungsi untuk menyiapkan titik awal pada waktu prosesor umpan perubahan. Setelah kontainer sewa diinisialisasi untuk pertama kalinya, mengubah opsi ini tidak berpengaruh.

Menyesuaikan titik awal hanya tersedia untuk mode umpan perubahan versi terbaru. Saat menggunakan semua versi dan mode penghapusan, Anda harus mulai membaca sejak prosesor dimulai, atau melanjutkan dari status sewa sebelumnya yang berada dalam periode retensi cadangan berkelanjutan dari akun Anda.

Umpan perubahan dan throughput yang tersedia

Ubah operasi baca umpan pada kontainer yang dipantau menggunakan unit permintaan. Pastikan kontainer yang dipantau tidak mengalami pembatasan. Pembatasan menambahkan penundaan dalam menerima peristiwa umpan perubahan pada prosesor Anda.

Operasi pada kontainer sewa (memperbarui dan mempertahankan status) menggunakan unit permintaan. Semakin tinggi jumlah instans yang menggunakan kontainer sewa yang sama, semakin tinggi potensi konsumsi unit permintaan. Pastikan kontainer sewa Anda tidak mengalami pembatasan. Pembatasan menambahkan penundaan dalam menerima peristiwa umpan perubahan. Pembatasan bahkan dapat sepenuhnya mengakhiri pemrosesan.

Bagikan kontainer sewa

Anda dapat berbagi kontainer sewa di beberapa unit penyebaran. Dalam kontainer sewa bersama, setiap unit penyebaran mendengarkan kontainer yang dipantau yang berbeda atau memiliki nilai yang berbeda untuk processorName. Dalam konfigurasi ini, setiap unit penyebaran mempertahankan status independen pada kontainer sewa. Tinjau konsumsi unit permintaan pada kontainer sewa untuk memastikan bahwa throughput yang disediakan sudah cukup untuk semua unit penyebaran.

Konfigurasi sewa tingkat lanjut

Tiga konfigurasi utama dapat memengaruhi cara kerja prosesor umpan perubahan. Setiap konfigurasi memengaruhi konsumsi unit permintaan pada kontainer sewa. Anda dapat mengatur salah satu konfigurasi ini saat membuat prosesor umpan perubahan, tetapi menggunakannya dengan hati-hati:

  • Peroleh Sewa: Secara default, setiap 17 detik. Host secara berkala memeriksa status penyimpanan sewa dan mempertimbangkan untuk memperoleh sewa sebagai bagian dari proses penskalaan dinamis. Proses ini dilakukan dengan menjalankan Kueri pada kontainer sewa. Mengurangi nilai ini membuat penyeimbangan ulang dan memperoleh sewa lebih cepat, tetapi meningkatkan konsumsi unit permintaan pada kontainer sewa.
  • Kedaluwarsa Sewa: Secara default, 60 detik. Menentukan jumlah waktu maksimum bahwa sewa dapat ada tanpa aktivitas perpanjangan sebelum diperoleh oleh host lain. Ketika host crash, sewa yang dimilikinya diambil oleh host lain setelah periode waktu ini ditambah interval perpanjangan yang dikonfigurasi. Mengurangi nilai ini membuat pemulihan setelah crash host lebih cepat, tetapi nilai kedaluwarsa tidak boleh lebih rendah dari interval perpanjangan.
  • Perpanjangan Sewa: Secara default, setiap 13 detik. Host yang memiliki sewa secara berkala memperbarui sewa, bahkan jika tidak ada perubahan baru untuk dikonsumsi. Proses ini dilakukan dengan mengeksekusi Penggantian sewa. Mengurangi nilai ini menurunkan waktu yang diperlukan untuk mendeteksi sewa yang hilang oleh host yang crash, tetapi meningkatkan konsumsi unit permintaan pada kontainer sewa.

Tempat menghosting prosesor umpan perubahan

Prosesor umpan perubahan dapat dihosting di platform apa pun yang mendukung proses atau tugas yang berjalan lama. Berikut adalah beberapa contoh:

Meskipun prosesor umpan perubahan dapat berjalan di lingkungan berumur pendek karena kontainer sewa mempertahankan status, siklus startup lingkungan ini menambahkan penundaan pada waktu yang diperlukan untuk menerima pemberitahuan (karena overhead memulai prosesor setiap kali lingkungan dimulai).

Persyaratan akses berbasis peran

Saat menggunakan ID Microsoft Entra sebagai mekanisme autentikasi, pastikan identitas memiliki izin yang tepat:

  • Pada kontainer yang dipantau:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • Pada kontainer sewa:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Sumber Daya Tambahan:

Langkah berikutnya

Pelajari selengkapnya tentang prosesor umpan perubahan di artikel berikut ini: