Aracılığıyla paylaş


Depolama Bloblarını kullanarak Javascript için Checkpoint Store kitaplığını Azure Event Hubs

denetim noktalarını depolamak ve @azure/event-hubs kitaplığından kullanırken EventHubConsumerClient yük dengelemeye yardımcı olmak için Azure Blob depolama tabanlı bir çözüm

Kaynak kodu | Paket (npm) | API Başvurusu Belgeleri | Örnekleri

Başlarken

Paketi yükleme

npm kullanarak Azure Event Hubs Checkpoint Store Blob kitaplığını yükleme

npm install @azure/eventhubs-checkpointstore-blob

Önkoşullar: Bu paketi kullanmak için bir Azure aboneliğiniz, event hubs ad alanınız ve bir Depolama hesabınız olmalıdır

Bu paketi Node.js bir uygulamada kullanıyorsanız 8.x veya üzeri Node.js kullanın.

Typescript'i yapılandırma

TypeScript kullanıcılarının Düğüm türü tanımlarının yüklü olması gerekir:

npm install @types/node

tsconfig.json dosyasında da etkinleştirmeniz compilerOptions.allowSyntheticDefaultImports gerekir. seçeneğini etkinleştirdiyseniz compilerOptions.esModuleInteropallowSyntheticDefaultImports varsayılan olarak etkin olduğunu unutmayın. Daha fazla bilgi için bkz. TypeScript'in derleyici seçenekleri el kitabı .

Önemli kavramlar

  • Ölçek: Her tüketicinin birkaç Event Hubs bölümünden okuma sahipliğini aldığı birden çok tüketici oluşturun.

  • Yük dengeleme: Yük dengelemeyi destekleyen uygulamalar, aynı Olay Hub'ından EventHubConsumerClient ve tüketici grubundan ve aynı CheckpointStore. İş yüklerini farklı örnekler arasında dengelemek için işlenecek bölümleri kendi aralarında dağıtırlar.

  • Denetim noktası oluşturma: Bu, okuyucuların bir bölüm olay dizisi içindeki konumlarını işaretlediği veya işlediği bir işlemdir. Denetim noktası oluşturma, tüketicinin sorumluluğundadır ve bir tüketici grubunda bölüm başına temelinde gerçekleşir. Bu sorumluluk, her bir tüketici grubu için her bölüm okuyucusunun geçerli konumunu olay akışında izlemesi gerektiği ve veri akışının tamamlandığını düşündüğünde hizmeti bilgilendirebileceği anlamına gelir.

    Bir okuyucunun bölüm bağlantısı kesilirse yeniden bağlandığında ilgili tüketici grubundaki o bölümün son okuyucusu tarafından daha önce gönderilen denetim noktasında okumaya başlar. Okuyucu bağlandığında, okumaya başlayacağı konumu belirtmek için uzaklığı olay hub'ına geçirir. Bu şekilde, denetim noktası oluşturma özelliğini hem aşağı akış uygulamaları ile olayları "tamamlandı" olarak işaretlemek hem de farklı makinelerde çalışan okuyucular arasında bir yük devretme oluşması durumunda esneklik sağlamak amacıyla kullanabilirsiniz. Bu denetim noktası oluşturma işleminden daha düşük bir uzaklık belirterek daha eski verilere geri dönülebilir. Bu mekanizmayla denetim noktası oluşturma özelliği hem yük devretme esnekliği hem de olay akışı yeniden yürütmesi sağlar.

    BlobCheckpointStore, yük ve güncelleştirme denetim noktalarını dengelemek için EventHubConsumerClient tarafından gereken temel yöntemleri uygulayan bir sınıftır.

Örnekler

Azure Blob Depolama kullanarak oluşturma CheckpointStore

Oluşturmak CheckpointStoreiçin aşağıdaki kod parçacığını kullanın. depolama hesabınıza bağlantı dizesi sağlamanız gerekir.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Azure Blob depolama kullanan denetim noktası olayları

Azure Blob Depolama kullanılarak alınan olayları denetlemek için SubscriptionEventHandlers arabirimiyle uyumlu bir nesneyi ve yöntemini çağırma updateCheckpoint() kodunu geçirmeniz gerekir.

Bu örnekte, SubscriptionHandlersSubscriptionEventHandlers uygular ve denetim noktası oluşturmayı da işler.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Sorun giderme

Günlükleri etkinleştirme

olarak günlüğe kaydetmeyi AZURE_LOG_LEVELstderretkinleştirmek için ortam değişkenini aşağıdaki değerlerden birine ayarlayabilirsiniz:

  • ayrıntılı
  • bilgiler
  • warning
  • error

Ayrıca @azure/günlükçü paketini içeri aktarıp günlük düzeyi değerlerinden biriyle işlevini çağırarak setLogLevel günlük düzeyini programlı olarak ayarlayabilirsiniz.

Günlük düzeyini program aracılığıyla veya ortam değişkeni aracılığıyla AZURE_LOG_LEVEL ayarlarken, seçtiğinize eşit veya ondan küçük bir günlük düzeyi kullanılarak yazılan tüm günlükler yayılır. Örneğin, günlük düzeyini olarak infoayarladığınızda, düzeyler warning için yazılan ve error aynı zamanda yayılan günlükler. Bu SDK, hangi düzeyde oturum açıldığı belirlenirken TypeScript için Azure SDK yönergelerini izler.

Alternatif olarak, bu kitaplığı kullanırken günlükleri almak için ortam değişkenini ayarlayabilirsiniz DEBUG . Bu, günlükleri bağımlılıklardan rhea-promiserhea ve bağımlılıklardan da yaymak istiyorsanız yararlı olabilir.

Not: AZURE_LOG_LEVEL ayarlanırsa DEBUG'a göre önceliklidir. AZURE_LOG_LEVEL belirtirken veya setLogLevel'i çağırırken DEBUG aracılığıyla hiçbir azure kitaplık belirtmeyin.

Bu kitaplığı kullanırken hata ayıklama günlüklerini almak için aşağıdaki ortam değişkenini ayarlayabilirsiniz.

  • Eventhubs Checkpointstore Blobundan yalnızca bilgi düzeyinde hata ayıklama günlükleri alınıyor.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Bir dosyada günlüğe kaydetme

  • Yukarıda gösterildiği gibi günlüğe kaydetmeyi etkinleştirin ve ardından test betiğinizi aşağıdaki gibi çalıştırın:

    • Test betiğinizdeki günlük deyimleri adresine out.log , sdk'dan günlük deyimleri ise adresine debug.loggider.

      node your-test-script.js > out.log 2>debug.log
      
    • Test betiğinizden ve sdk'dan günlük deyimleri, stderr'ı stdout'a (&1) yönlendirerek aynı out.log dosyaya gider ve ardından stdout'ı bir dosyaya yeniden yönlendirir:

      node your-test-script.js >out.log 2>&1
      
    • Test betiğinizden ve sdk'dan günlük deyimleri aynı dosyaya out.loggider.

      node your-test-script.js &> out.log
      

Sonraki Adımlar

Ayrıntılı örnek için lütfen samples dizinine göz atın.

Katkıda bulunma

Bu kitaplığa katkıda bulunmak isterseniz, kodu derleme ve test etme hakkında daha fazla bilgi edinmek için lütfen katkıda bulunma kılavuzunu okuyun.

İzlenimler