Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Rozwiązanie oparte na usłudze Azure Blob Storage do przechowywania punktów kontrolnych i pomaga w równoważeniu obciążenia podczas korzystania z EventHubConsumerClient
z biblioteki @azure/event-hubs
Kluczowe linki:
- kod źródłowy
- pakiet (npm)
- Dokumentacja referencyjna interfejsu API
- przykładów
Wprowadzenie
Obecnie obsługiwane środowiska
- wersje Node.js LTS
- Najnowsze wersje przeglądarek Safari, Chrome, Edge i Firefox.
Aby uzyskać więcej informacji, zobacz nasze zasad pomocy technicznej.
Warunki wstępne
- subskrypcji platformy Azure
- Przestrzeń nazw usługi Event Hubs
- Konto usługi Storage
Instalowanie pakietu
Instalowanie biblioteki obiektów blob magazynu punktów kontrolnych usługi Azure Event Hubs przy użyciu narzędzia npm
npm install @azure/eventhubs-checkpointstore-blob
Konfigurowanie skryptu TypeScript
Użytkownicy języka TypeScript muszą mieć zainstalowane definicje typu węzła:
npm install @types/node
Należy również włączyć compilerOptions.allowSyntheticDefaultImports
w tsconfig.json. Należy pamiętać, że jeśli włączono compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
jest domyślnie włączona. Aby uzyskać więcej informacji, zobacz podręcznik opcje kompilatora języka TypeScript.
Kluczowe pojęcia
Skalowanie: Utwórz wielu użytkowników, a każdy użytkownik przejmuje własność odczytu z kilku partycji usługi Event Hubs.
Równoważenie obciążenia: aplikacje obsługujące równoważenie obciążenia składają się z co najmniej jednego wystąpienia
EventHubConsumerClient
, które zostały skonfigurowane do korzystania ze zdarzeń z tej samej grupy zdarzeń i grupy odbiorców oraz tego samegoCheckpointStore
. Równoważą obciążenie między różnymi wystąpieniami, dystrybuując partycje do przetworzenia między sobą.Checkpointing: Jest to proces, za pomocą którego czytelnicy oznaczają lub zatwierdzają swoją pozycję w sekwencji zdarzeń partycji. Tworzenie punktów kontrolnych jest obowiązkiem konsumenta i odbywa się na podstawie poszczególnych partycji w grupie odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytelnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługę, gdy rozważa ukończenie strumienia danych.
Jeśli czytelnik odłączy się od partycji, po ponownym połączeniu rozpocznie odczytywanie w punkcie kontrolnym, który został wcześniej przesłany przez ostatniego czytelnika tej partycji w tej grupie odbiorców. Gdy czytnik nawiąż połączenie, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której ma rozpocząć odczytywanie. W ten sposób można użyć punktów kontrolnych, aby oznaczyć zdarzenia jako "kompletne" przez aplikacje podrzędne i zapewnić odporność w przypadku przejścia w tryb failover między czytnikami uruchomionymi na różnych maszynach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia z tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych umożliwia zarówno odporność trybu failover, jak i odtwarzanie strumienia zdarzeń.
BlobCheckpointStore to klasa, która implementuje kluczowe metody wymagane przez klasę EventHubConsumerClient w celu równoważenia obciążenia i aktualizowania punktów kontrolnych.
Przykłady
- tworzenie magazynu punktów kontrolnych przy użyciu usługi Azure Blob Storage
- zdarzeń punktu kontrolnego przy użyciu usługi Azure Blob Storage
Tworzenie CheckpointStore
przy użyciu usługi Azure Blob Storage
Użyj poniższego fragmentu kodu, aby utworzyć CheckpointStore
. Należy podać parametry połączenia do konta magazynu.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Zdarzenia punktu kontrolnego korzystające z usługi Azure Blob Storage
Aby zdarzenia punktu kontrolnego odebrane przy użyciu usługi Azure Blob Storage, należy przekazać obiekt zgodny z interfejsem SubscriptionEventHandlers wraz z kodem w celu wywołania metody updateCheckpoint()
.
W tym przykładzie SubscriptionHandlers
implementuje SubscriptionEventHandlers, a także obsługuje tworzenie punktów kontrolnych.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";
const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);
if (!(await blobContainerClient.exists())) {
await blobContainerClient.create();
}
const checkpointStore = new BlobCheckpointStore(blobContainerClient);
const consumerClient = new EventHubConsumerClient(
consumerGroup,
eventHubConnectionString,
eventHubName,
checkpointStore,
);
const subscription = consumerClient.subscribe({
processEvents: async (events, context) => {
// event processing code goes here
if (events.length === 0) {
// If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
// will pass you an empty array.
return;
}
// Checkpointing will allow your service to pick up from
// where it left off when restarting.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(events[events.length - 1]);
},
processError: async (err, context) => {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
},
});
// Wait for a few seconds to receive events before closing
await new Promise((resolve) => setTimeout(resolve, 10 * 1000));
await subscription.close();
await consumerClient.close();
Rozwiązywanie problemów
Wyrąb
Możesz ustawić zmienną środowiskową AZURE_LOG_LEVEL
na jedną z następujących wartości, aby umożliwić rejestrowanie stderr
:
- gadatliwy
- Informacji
- ostrzeżenie
- błąd
Poziom dziennika można również ustawić programowo, importując pakiet @azure/rejestratora i wywołując funkcję setLogLevel
przy użyciu jednej z wartości na poziomie dziennika.
import { setLogLevel } from "@azure/logger";
setLogLevel("info");
Podczas ustawiania poziomu dziennika programowo lub za pośrednictwem zmiennej środowiskowej AZURE_LOG_LEVEL
wszystkie dzienniki zapisywane przy użyciu poziomu dziennika równego lub mniejszego niż wybrany zostanie wyemitowany.
Na przykład po ustawieniu poziomu dziennika na info
są również emitowane dzienniki zapisywane dla poziomów warning
i error
.
Ten zestaw SDK jest zgodny z wytycznymi zestawu Azure SDK dla języka TypeScript podczas określania poziomu do zalogowania.
Alternatywnie można ustawić zmienną środowiskową DEBUG
w celu pobrania dzienników podczas korzystania z tej biblioteki.
Może to być przydatne, jeśli chcesz również emitować dzienniki z zależności rhea-promise
i rhea
.
Uwaga: AZURE_LOG_LEVEL, jeśli ustawiono, ma pierwszeństwo przed debugowaniem.
Nie należy określać żadnych bibliotek azure
za pomocą debugowania podczas określania również AZURE_LOG_LEVEL lub wywoływania polecenia setLogLevel.
Możesz ustawić następującą zmienną środowiskową, aby pobrać dzienniki debugowania podczas korzystania z tej biblioteki.
- Pobieranie tylko dzienników debugowania na poziomie informacji z obiektu blob checkpointstore usługi EventHubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Rejestrowanie w pliku
Włącz rejestrowanie, jak pokazano powyżej, a następnie uruchom skrypt testowy w następujący sposób:
Instrukcje rejestrowania ze skryptu testowego przejdź do
out.log
i instrukcji rejestrowania z zestawu SDK przejdź dodebug.log
.node your-test-script.js > out.log 2>debug.log
Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przechodzą do tego samego pliku
out.log
przez przekierowanie narzędzia stderr do pliku stdout (&1), a następnie przekierowanie elementu stdout do pliku:node your-test-script.js >out.log 2>&1
Instrukcje rejestrowania ze skryptu testowego i zestawu SDK przejdź do tego samego pliku
out.log
.node your-test-script.js &> out.log
Następne kroki
Zapoznaj się z przykładami katalogu, aby uzyskać szczegółowy przykład.
Przyczyniając się
Jeśli chcesz współtworzyć tę bibliotekę, przeczytaj przewodnik dotyczący współtworzenia , aby dowiedzieć się więcej na temat tworzenia i testowania kodu.
Azure SDK for JavaScript