Bagikan melalui


Azure Event Hubs pustaka Checkpoint Store untuk Javascript menggunakan Blob Penyimpanan

Solusi berbasis penyimpanan Azure Blob untuk menyimpan titik pemeriksaan dan membantu penyeimbangan beban saat menggunakan EventHubConsumerClient dari pustaka @azure/event-hubs

Kode sumber | Paket (npm) | Dokumentasi | Referensi APISampel

Memulai

Menginstal paket

Instal pustaka Azure Event Hubs Checkpoint Store Blob menggunakan npm

npm install @azure/eventhubs-checkpointstore-blob

Prasyarat: Anda harus memiliki langganan Azure, Namespace Layanan Pusat Aktivitas untuk menggunakan paket ini, dan akun Penyimpanan

Jika Anda menggunakan paket ini dalam aplikasi Node.js, gunakan Node.js 8.x atau yang lebih tinggi.

Mengonfigurasi Typescript

Pengguna TypeScript harus menginstal definisi jenis Node:

npm install @types/node

Anda juga perlu mengaktifkan compilerOptions.allowSyntheticDefaultImports di tsconfig.json Anda. Perhatikan bahwa jika Anda telah mengaktifkan compilerOptions.esModuleInterop, allowSyntheticDefaultImports diaktifkan secara default. Lihat buku pegangan opsi pengkompilasi TypeScript untuk informasi selengkapnya.

Konsep utama

  • Skala: Buat beberapa konsumen, dengan tiap konsumen mengambil kepemilikan membaca dari beberapa partisi Azure Event Hubs.

  • Keseimbangan beban: Aplikasi yang mendukung penyeimbangan beban terdiri dari satu atau beberapa instans yang telah dikonfigurasi untuk mengonsumsi EventHubConsumerClient peristiwa dari Event Hub dan grup konsumen yang sama dan yang sama CheckpointStore. Mereka menyeimbangkan beban kerja di berbagai instans dengan mendistribusikan partisi yang akan diproses di antara mereka sendiri.

  • Titik pemeriksaan: Ini adalah proses di mana pembaca menandai atau menerapkan posisi mereka dalam urutan peristiwa partisi. Titik pemeriksaan adalah tanggung jawab konsumen dan terjadi berbasis partisi dalam kelompok konsumen. Tanggung jawab ini berarti bahwa untuk setiap kelompok konsumen, setiap pembaca partisi harus melacak posisinya saat ini di aliran peristiwa, dan dapat menginformasikan layanan ketika menganggap aliran data selesai.

    Jika pembaca memutuskan sambungan dari partisi, ketika menghubungkannya kembali, ia mulai membaca di titik pemeriksaan yang sebelumnya dikirimkan oleh pembaca terakhir partisi dalam kelompok konsumen itu. Ketika pembaca terhubung, itu menyalurkan offset ke pusat aktivitas untuk menentukan lokasi untuk mulai membaca. Dengan cara ini, Anda dapat menggunakan titik pemeriksaan untuk menandai peristiwa sebagai "lengkap" oleh aplikasi hilir, dan untuk memberikan ketahanan jika kegagalan dengan pembaca yang menjalankan komputer yang berbeda terjadi. Anda dapat kembali ke data yang lebih lama dengan menentukan offset yang lebih rendah dari proses titik pemeriksaan ini. Melalui mekanisme ini, titik pemeriksaan memungkinkan ketahanan kegagalan dan pemutaran ulang aliran peristiwa.

    BlobCheckpointStore adalah kelas yang menerapkan metode utama yang diperlukan oleh EventHubConsumerClient untuk menyeimbangkan beban dan memperbarui titik pemeriksaan.

Contoh

CheckpointStore Membuat menggunakan Azure Blob Storage

Gunakan cuplikan kode di bawah ini untuk membuat CheckpointStore. Anda harus memberikan string koneksi ke akun penyimpanan Anda.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Peristiwa titik pemeriksaan menggunakan penyimpanan Azure Blob

Untuk peristiwa titik pemeriksaan yang diterima menggunakan Azure Blob Storage, Anda harus meneruskan objek yang kompatibel dengan antarmuka SubscriptionEventHandlers bersama dengan kode untuk memanggil updateCheckpoint() metode .

Dalam contoh ini, SubscriptionHandlers menerapkan SubscriptionEventHandlers dan juga menangani titik pemeriksaan.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Pemecahan Masalah

Aktifkan log

Anda dapat mengatur AZURE_LOG_LEVEL variabel lingkungan ke salah satu nilai berikut untuk mengaktifkan pengelogan ke stderr:

  • verbose
  • info
  • peringatan
  • kesalahan

Anda juga dapat mengatur tingkat log secara terprogram dengan mengimpor paket @azure/pencatat dan memanggil setLogLevel fungsi dengan salah satu nilai tingkat log.

Saat mengatur tingkat log baik secara terprogram atau melalui AZURE_LOG_LEVEL variabel lingkungan, log apa pun yang ditulis menggunakan tingkat log yang sama dengan atau kurang dari yang Anda pilih akan dipancarkan. Misalnya, saat Anda mengatur tingkat log ke info, log yang ditulis untuk tingkat warning dan error juga dipancarkan. SDK ini mengikuti panduan Azure SDK untuk TypeScript saat menentukan tingkat mana yang akan masuk.

Anda dapat mengatur DEBUG variabel lingkungan untuk mendapatkan log saat menggunakan pustaka ini. Ini dapat berguna jika Anda juga ingin memancarkan log dari dependensi rhea-promise dan rhea juga.

Catatan: AZURE_LOG_LEVEL, jika diatur, lebih diutamakan daripada DEBUG. Jangan tentukan pustaka apa pun azure melalui DEBUG saat juga menentukan AZURE_LOG_LEVEL atau memanggil setLogLevel.

Anda dapat mengatur variabel lingkungan berikut untuk mendapatkan log debug saat menggunakan pustaka ini.

  • Hanya mendapatkan log debug tingkat info dari Blob Eventhubs Checkpointstore.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Pengelogan ke file

  • Aktifkan pengelogan seperti yang ditunjukkan di atas lalu jalankan skrip pengujian Anda sebagai berikut:

    • Pernyataan pengelogan dari skrip pengujian Anda masuk ke out.log dan pernyataan pengelogan dari sdk buka debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Pernyataan pengelogan dari skrip pengujian Anda dan sdk masuk ke file out.log yang sama dengan mengalihkan stderr ke stdout (&1), lalu alihkan stdout ke file:

      node your-test-script.js >out.log 2>&1
      
    • Pernyataan pengelogan dari skrip pengujian Anda dan sdk masuk ke file out.logyang sama .

      node your-test-script.js &> out.log
      

Langkah berikutnya

Silakan lihat direktori sampel untuk contoh terperinci.

Berkontribusi

Jika Anda ingin berkontribusi pada pustaka ini, baca panduan berkontribusi untuk mempelajari selengkapnya tentang cara membuat dan menguji kode.

Tayangan