Sdílet prostřednictvím


Azure Event Hubs knihovny Checkpoint Store pro JavaScript s využitím objektů blob služby Storage

Řešení založené na službě Azure Blob Storage pro ukládání kontrolních bodů a pomoc při vyrovnávání zatížení při použití EventHubConsumerClient knihovny @azure/event-hubs

Zdrojový kód | Balíček (npm) | Referenční dokumentace k | rozhraní APIVzorky

Začínáme

Instalace balíčku

Instalace knihovny objektů blob Azure Event Hubs Checkpoint Store pomocí npm

npm install @azure/eventhubs-checkpointstore-blob

Předpoklady: Abyste mohli tento balíček používat, musíte mít předplatné Azure, obor názvů služby Event Hubs a účet úložiště.

Pokud tento balíček používáte v aplikaci Node.js, použijte Node.js 8.x nebo novější.

Konfigurace TypeScriptu

Uživatelé TypeScriptu musí mít nainstalované definice typů uzlů:

npm install @types/node

Musíte také povolit compilerOptions.allowSyntheticDefaultImports soubor tsconfig.json. Všimněte si, že pokud jste povolili compilerOptions.esModuleInterop, allowSyntheticDefaultImports je ve výchozím nastavení povolená. Další informace najdete v příručce k možnostem kompilátoru TypeScriptu .

Klíčové koncepty

  • Měřítko: Vytvořte několik příjemců, přičemž každý z nich převezme vlastnictví čtení z několika oddílů služby Event Hubs.

  • Vyrovnávání zatížení: Aplikace, které podporují vyrovnávání zatížení, se skládají z jedné nebo několika instancí EventHubConsumerClient , jejichž konfigurace byla nakonfigurována tak, aby využívala události ze stejného centra událostí a skupiny příjemců a stejné CheckpointStoreskupiny . Vyrovnávají zatížení mezi různými instancemi rozdělením oddílů, které se mají zpracovat mezi sebe.

  • Vytváření kontrolních bodů: Jedná se o proces, kterým čtenáři označí nebo potvrdí svou pozici v sekvenci událostí oddílu. Za vytváření kontrolních bodů zodpovídá příjemce. Proces probíhá na bázi oddílů ve skupinách příjemců. Taková zodpovědnost znamená, že si každý čtenář oddílu v každé skupině příjemců musí udržovat přehled o své aktuální pozici v datovém proudu událostí a může informovat službu, když bude považovat datový proud za dokončený.

    Pokud se čtenář z oddílu odpojí, začne při opětovném připojení číst od kontrolního bodu, který dříve zaslal poslední čtenář daného oddílu z této skupiny příjemců. Když se čtenář připojí, předá posun do centra událostí a určí umístění, ve kterém se má začít číst. Takto můžete vytváření kontrolních bodů použít jak k označování událostí jako „dokončených“, tak k zajištění ochrany pro případ, že nastane selhání u čtenářů spuštěných na různých strojích. Ke starším datům se je možné vrátit tak, že určíte nižší posun od tohoto kontrolního bodu. Díky tomuto mechanismu umožňuje vytváření kontrolních bodů nejen obnovu při selhání, ale i opakované přehrání datového proudu.

    BlobCheckpointStore je třída, která implementuje klíčové metody vyžadované EventHubConsumerClient pro vyrovnávání zatížení a aktualizace kontrolních bodů.

Příklady

Vytvoření CheckpointStore pomocí Azure Blob Storage

Pomocí následujícího fragmentu kódu vytvořte CheckpointStore. Ke svému účtu úložiště budete muset zadat připojovací řetězec.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Události kontrolního bodu s využitím služby Azure Blob Storage

Pokud chcete zkontrolovat události přijaté pomocí Azure Blob Storage, budete muset předat objekt, který je kompatibilní s rozhraním SubscriptionEventHandlers, spolu s kódem pro volání updateCheckpoint() metody .

V tomto příkladu SubscriptionHandlers implementuje SubscriptionEventHandlers a také zpracovává kontrolní body.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Poradce při potížích

Povolení protokolů

Můžete nastavit proměnnou AZURE_LOG_LEVEL prostředí na jednu z následujících hodnot, abyste povolili protokolování do stderr:

  • verbose
  • Info
  • upozornění
  • error

Úroveň protokolu můžete také programově nastavit importem balíčku @azure/logger a zavoláním setLogLevel funkce s jednou z hodnot na úrovni protokolu.

Při programovém nastavení úrovně protokolu nebo prostřednictvím AZURE_LOG_LEVEL proměnné prostředí se vygenerují všechny protokoly zapsané pomocí úrovně protokolu, která je stejná nebo menší než ta, kterou zvolíte. Pokud například nastavíte úroveň protokolu na info, protokoly, které jsou zapsány pro úrovně warning a error jsou také generovány. Tato sada SDK při určování úrovně, ke které se má protokolovat, se řídí pokyny sady Azure SDK pro TypeScript.

Případně můžete nastavit proměnnou prostředí pro DEBUG získání protokolů při použití této knihovny. To může být užitečné, pokud chcete také generovat protokoly ze závislostí rhea-promise a rhea také.

Poznámka: AZURE_LOG_LEVEL, pokud je nastavená, má přednost před laděním. Při zadávání AZURE_LOG_LEVEL nebo volání setLogLevel nezadávejte žádné azure knihovny prostřednictvím funkce DEBUG.

Pokud použijete tuto knihovnu, můžete nastavit následující proměnnou prostředí, abyste získali protokoly ladění.

  • Získání pouze protokolů ladění na úrovni informací z objektu blob úložiště kontrolních bodů EventHubs
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Protokolování do souboru

  • Povolte protokolování, jak je znázorněno výše, a pak následujícím způsobem spusťte testovací skript:

    • Příkazy protokolování z testovacího skriptu přejdou do out.log a příkazy protokolování ze sady SDK se přejdou do debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Příkazy protokolování z testovacího skriptu a sady SDK přejdou do stejného souboru out.log přesměrováním stderru na stdout (&1) a pak přesměrují stdout do souboru:

      node your-test-script.js >out.log 2>&1
      
    • Příkazy protokolování z testovacího skriptu a sady SDK přejdou do stejného souboru out.log.

      node your-test-script.js &> out.log
      

Další kroky

Podrobný příklad najdete v adresáři samples .

Přispívání

Pokud chcete přispívat do této knihovny, přečtěte si příručku pro přispívání , kde najdete další informace o tom, jak sestavit a otestovat kód.

Imprese