Udostępnij za pośrednictwem


Biblioteka klienta magazynu punktów kontrolnych usługi Azure Event Hubs dla języka JavaScript — wersja 2.0.0-beta.1

Rozwiązanie oparte na usłudze Azure Blob Storage do przechowywania punktów kontrolnych i pomaga w równoważeniu obciążenia podczas korzystania z EventHubConsumerClient z biblioteki @azure/event-hubs

Kluczowe linki:

Wprowadzenie

Obecnie obsługiwane środowiska

  • wersje Node.js LTS
  • Najnowsze wersje przeglądarek Safari, Chrome, Edge i Firefox.

Aby uzyskać więcej informacji, zobacz nasze zasad pomocy technicznej.

Warunki wstępne

Instalowanie pakietu

Instalowanie biblioteki obiektów blob magazynu punktów kontrolnych usługi Azure Event Hubs przy użyciu narzędzia npm

npm install @azure/eventhubs-checkpointstore-blob

Konfigurowanie skryptu TypeScript

Użytkownicy języka TypeScript muszą mieć zainstalowane definicje typu węzła:

npm install @types/node

Należy również włączyć compilerOptions.allowSyntheticDefaultImports w tsconfig.json. Należy pamiętać, że jeśli włączono compilerOptions.esModuleInterop, allowSyntheticDefaultImports jest domyślnie włączona. Aby uzyskać więcej informacji, zobacz podręcznik opcje kompilatora języka TypeScript.

Kluczowe pojęcia

  • Skalowanie: Utwórz wielu użytkowników, a każdy użytkownik przejmuje własność odczytu z kilku partycji usługi Event Hubs.

  • Równoważenie obciążenia: aplikacje obsługujące równoważenie obciążenia składają się z co najmniej jednego wystąpienia EventHubConsumerClient, które zostały skonfigurowane do korzystania ze zdarzeń z tej samej grupy zdarzeń i grupy odbiorców oraz tego samego CheckpointStore. Równoważą obciążenie między różnymi wystąpieniami, dystrybuując partycje do przetworzenia między sobą.

  • Checkpointing: Jest to proces, za pomocą którego czytelnicy oznaczają lub zatwierdzają swoją pozycję w sekwencji zdarzeń partycji. Tworzenie punktów kontrolnych jest obowiązkiem konsumenta i odbywa się na podstawie poszczególnych partycji w grupie odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytelnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługę, gdy rozważa ukończenie strumienia danych.

    Jeśli czytelnik odłączy się od partycji, po ponownym połączeniu rozpocznie odczytywanie w punkcie kontrolnym, który został wcześniej przesłany przez ostatniego czytelnika tej partycji w tej grupie odbiorców. Gdy czytnik nawiąż połączenie, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której ma rozpocząć odczytywanie. W ten sposób można użyć punktów kontrolnych, aby oznaczyć zdarzenia jako "kompletne" przez aplikacje podrzędne i zapewnić odporność w przypadku przejścia w tryb failover między czytnikami uruchomionymi na różnych maszynach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia z tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych umożliwia zarówno odporność trybu failover, jak i odtwarzanie strumienia zdarzeń.

    BlobCheckpointStore to klasa, która implementuje kluczowe metody wymagane przez klasę EventHubConsumerClient w celu równoważenia obciążenia i aktualizowania punktów kontrolnych.

Przykłady

Tworzenie CheckpointStore przy użyciu usługi Azure Blob Storage

Użyj poniższego fragmentu kodu, aby utworzyć CheckpointStore. Należy podać parametry połączenia do konta magazynu.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

Zdarzenia punktu kontrolnego korzystające z usługi Azure Blob Storage

Aby zdarzenia punktu kontrolnego odebrane przy użyciu usługi Azure Blob Storage, należy przekazać obiekt zgodny z interfejsem SubscriptionEventHandlers wraz z kodem w celu wywołania metody updateCheckpoint().

W tym przykładzie SubscriptionHandlers implementuje SubscriptionEventHandlers, a także obsługuje tworzenie punktów kontrolnych.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

if (!(await blobContainerClient.exists())) {
  await blobContainerClient.create();
}

const checkpointStore = new BlobCheckpointStore(blobContainerClient);
const consumerClient = new EventHubConsumerClient(
  consumerGroup,
  eventHubConnectionString,
  eventHubName,
  checkpointStore,
);

const subscription = consumerClient.subscribe({
  processEvents: async (events, context) => {
    // event processing code goes here
    if (events.length === 0) {
      // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
      // will pass you an empty array.
      return;
    }

    // Checkpointing will allow your service to pick up from
    // where it left off when restarting.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(events[events.length - 1]);
  },
  processError: async (err, context) => {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
  },
});

// Wait for a few seconds to receive events before closing
await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

await subscription.close();
await consumerClient.close();

Rozwiązywanie problemów

Wyrąb

Możesz ustawić zmienną środowiskową AZURE_LOG_LEVEL na jedną z następujących wartości, aby umożliwić rejestrowanie stderr:

  • gadatliwy
  • Informacji
  • ostrzeżenie
  • błąd

Poziom dziennika można również ustawić programowo, importując pakiet @azure/rejestratora i wywołując funkcję setLogLevel przy użyciu jednej z wartości na poziomie dziennika.

import { setLogLevel } from "@azure/logger";

setLogLevel("info");

Podczas ustawiania poziomu dziennika programowo lub za pośrednictwem zmiennej środowiskowej AZURE_LOG_LEVEL wszystkie dzienniki zapisywane przy użyciu poziomu dziennika równego lub mniejszego niż wybrany zostanie wyemitowany. Na przykład po ustawieniu poziomu dziennika na infosą również emitowane dzienniki zapisywane dla poziomów warning i error. Ten zestaw SDK jest zgodny z wytycznymi zestawu Azure SDK dla języka TypeScript podczas określania poziomu do zalogowania.

Alternatywnie można ustawić zmienną środowiskową DEBUG w celu pobrania dzienników podczas korzystania z tej biblioteki. Może to być przydatne, jeśli chcesz również emitować dzienniki z zależności rhea-promise i rhea.

Uwaga: AZURE_LOG_LEVEL, jeśli ustawiono, ma pierwszeństwo przed debugowaniem. Nie należy określać żadnych bibliotek azure za pomocą debugowania podczas określania również AZURE_LOG_LEVEL lub wywoływania polecenia setLogLevel.

Możesz ustawić następującą zmienną środowiskową, aby pobrać dzienniki debugowania podczas korzystania z tej biblioteki.

  • Pobieranie tylko dzienników debugowania na poziomie informacji z obiektu blob checkpointstore usługi EventHubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Rejestrowanie w pliku

  • Włącz rejestrowanie, jak pokazano powyżej, a następnie uruchom skrypt testowy w następujący sposób:

    • Instrukcje rejestrowania ze skryptu testowego przejdź do out.log i instrukcji rejestrowania z zestawu SDK przejdź do debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przechodzą do tego samego pliku out.log przez przekierowanie narzędzia stderr do pliku stdout (&1), a następnie przekierowanie elementu stdout do pliku:

      node your-test-script.js >out.log 2>&1
      
    • Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przejdź do tego samego pliku out.log.

      node your-test-script.js &> out.log
      

Następne kroki

Zapoznaj się z przykładami katalogu, aby uzyskać szczegółowy przykład.

Przyczyniając się

Jeśli chcesz współtworzyć tę bibliotekę, przeczytaj przewodnik dotyczący współtworzenia , aby dowiedzieć się więcej na temat tworzenia i testowania kodu.