Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Solusi berbasis penyimpanan Azure Blob untuk menyimpan titik pemeriksaan dan membantu penyeimbangan beban saat menggunakan EventHubConsumerClient dari pustaka @azure/event-hubs
Kode sumber | Paket (npm) | Dokumentasi | Referensi APISampel
Memulai
Menginstal paket
Instal pustaka Azure Event Hubs Checkpoint Store Blob menggunakan npm
npm install @azure/eventhubs-checkpointstore-blob
Prasyarat: Anda harus memiliki langganan Azure, Namespace Layanan Pusat Aktivitas untuk menggunakan paket ini, dan akun Penyimpanan
Jika Anda menggunakan paket ini dalam aplikasi Node.js, gunakan Node.js 8.x atau yang lebih tinggi.
Mengonfigurasi Typescript
Pengguna TypeScript harus menginstal definisi jenis Node:
npm install @types/node
Anda juga perlu mengaktifkan compilerOptions.allowSyntheticDefaultImports di tsconfig.json Anda. Perhatikan bahwa jika Anda telah mengaktifkan compilerOptions.esModuleInterop, allowSyntheticDefaultImports diaktifkan secara default. Lihat buku pegangan opsi pengkompilasi TypeScript untuk informasi selengkapnya.
Konsep utama
Skala: Buat beberapa konsumen, dengan tiap konsumen mengambil kepemilikan membaca dari beberapa partisi Azure Event Hubs.
Keseimbangan beban: Aplikasi yang mendukung penyeimbangan beban terdiri dari satu atau beberapa instans yang telah dikonfigurasi untuk mengonsumsi
EventHubConsumerClientperistiwa dari Event Hub dan grup konsumen yang sama dan yang samaCheckpointStore. Mereka menyeimbangkan beban kerja di berbagai instans dengan mendistribusikan partisi yang akan diproses di antara mereka sendiri.Titik pemeriksaan: Ini adalah proses di mana pembaca menandai atau menerapkan posisi mereka dalam urutan peristiwa partisi. Titik pemeriksaan adalah tanggung jawab konsumen dan terjadi berbasis partisi dalam kelompok konsumen. Tanggung jawab ini berarti bahwa untuk setiap kelompok konsumen, setiap pembaca partisi harus melacak posisinya saat ini di aliran peristiwa, dan dapat menginformasikan layanan ketika menganggap aliran data selesai.
Jika pembaca memutuskan sambungan dari partisi, ketika menghubungkannya kembali, ia mulai membaca di titik pemeriksaan yang sebelumnya dikirimkan oleh pembaca terakhir partisi dalam kelompok konsumen itu. Ketika pembaca terhubung, itu menyalurkan offset ke pusat aktivitas untuk menentukan lokasi untuk mulai membaca. Dengan cara ini, Anda dapat menggunakan titik pemeriksaan untuk menandai peristiwa sebagai "lengkap" oleh aplikasi hilir, dan untuk memberikan ketahanan jika kegagalan dengan pembaca yang menjalankan komputer yang berbeda terjadi. Anda dapat kembali ke data yang lebih lama dengan menentukan offset yang lebih rendah dari proses titik pemeriksaan ini. Melalui mekanisme ini, titik pemeriksaan memungkinkan ketahanan kegagalan dan pemutaran ulang aliran peristiwa.
BlobCheckpointStore adalah kelas yang menerapkan metode utama yang diperlukan oleh EventHubConsumerClient untuk menyeimbangkan beban dan memperbarui titik pemeriksaan.
Contoh
- Membuat CheckpointStore menggunakan Azure Blob Storage
- Peristiwa titik pemeriksaan menggunakan penyimpanan Azure Blob
CheckpointStore Membuat menggunakan Azure Blob Storage
Gunakan cuplikan kode di bawah ini untuk membuat CheckpointStore. Anda harus memberikan string koneksi ke akun penyimpanan Anda.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Peristiwa titik pemeriksaan menggunakan penyimpanan Azure Blob
Untuk peristiwa titik pemeriksaan yang diterima menggunakan Azure Blob Storage, Anda harus meneruskan objek yang kompatibel dengan antarmuka SubscriptionEventHandlers bersama dengan kode untuk memanggil updateCheckpoint() metode .
Dalam contoh ini, SubscriptionHandlers menerapkan SubscriptionEventHandlers dan juga menangani titik pemeriksaan.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Pemecahan Masalah
Aktifkan log
Anda dapat mengatur AZURE_LOG_LEVEL variabel lingkungan ke salah satu nilai berikut untuk mengaktifkan pengelogan ke stderr:
- verbose
- info
- peringatan
- kesalahan
Anda juga dapat mengatur tingkat log secara terprogram dengan mengimpor paket @azure/pencatat dan memanggil setLogLevel fungsi dengan salah satu nilai tingkat log.
Saat mengatur tingkat log baik secara terprogram atau melalui AZURE_LOG_LEVEL variabel lingkungan, log apa pun yang ditulis menggunakan tingkat log yang sama dengan atau kurang dari yang Anda pilih akan dipancarkan.
Misalnya, saat Anda mengatur tingkat log ke info, log yang ditulis untuk tingkat warning dan error juga dipancarkan.
SDK ini mengikuti panduan Azure SDK untuk TypeScript saat menentukan tingkat mana yang akan masuk.
Anda dapat mengatur DEBUG variabel lingkungan untuk mendapatkan log saat menggunakan pustaka ini.
Ini dapat berguna jika Anda juga ingin memancarkan log dari dependensi rhea-promise dan rhea juga.
Catatan: AZURE_LOG_LEVEL, jika diatur, lebih diutamakan daripada DEBUG.
Jangan tentukan pustaka apa pun azure melalui DEBUG saat juga menentukan AZURE_LOG_LEVEL atau memanggil setLogLevel.
Anda dapat mengatur variabel lingkungan berikut untuk mendapatkan log debug saat menggunakan pustaka ini.
- Hanya mendapatkan log debug tingkat info dari Blob Eventhubs Checkpointstore.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Pengelogan ke file
Aktifkan pengelogan seperti yang ditunjukkan di atas lalu jalankan skrip pengujian Anda sebagai berikut:
Pernyataan pengelogan dari skrip pengujian Anda masuk ke
out.logdan pernyataan pengelogan dari sdk bukadebug.log.node your-test-script.js > out.log 2>debug.logPernyataan pengelogan dari skrip pengujian Anda dan sdk masuk ke file
out.logyang sama dengan mengalihkan stderr ke stdout (&1), lalu alihkan stdout ke file:node your-test-script.js >out.log 2>&1Pernyataan pengelogan dari skrip pengujian Anda dan sdk masuk ke file
out.logyang sama .node your-test-script.js &> out.log
Langkah berikutnya
Silakan lihat direktori sampel untuk contoh terperinci.
Berkontribusi
Jika Anda ingin berkontribusi pada pustaka ini, baca panduan berkontribusi untuk mempelajari selengkapnya tentang cara membuat dan menguji kode.
Azure SDK for JavaScript