Megosztás a következőn keresztül:


Azure Event Hubs Checkpoint Store-kódtár JavaScripthez Storage-blobok használatával

Azure Blob Storage-alapú megoldás ellenőrzőpontok tárolására és terheléselosztás céljából a @azure/event-hubs kódtárból való használathoz EventHubConsumerClient

Forráskód | Csomag (npm) | API-referenciadokumentáció | Minták

Első lépések

A csomag telepítése

A Azure Event Hubs Checkpoint Áruház blobtárának telepítése az npm használatával

npm install @azure/eventhubs-checkpointstore-blob

Előfeltételek: A csomag használatához Rendelkeznie kell egy Azure-előfizetéssel, egy Event Hubs-névtérrel és egy Storage-fiókkal

Ha ezt a csomagot egy Node.js alkalmazásban használja, használja Node.js 8.x vagy újabb verziót.

Typescript konfigurálása

A TypeScript-felhasználóknak telepítve kell lenniük a csomóponttípus-definícióknak:

npm install @types/node

A tsconfig.json fájlban is engedélyeznie kell az engedélyezést compilerOptions.allowSyntheticDefaultImports . Vegye figyelembe, hogy ha engedélyeztecompilerOptions.esModuleInteropallowSyntheticDefaultImports, a alapértelmezés szerint engedélyezve van. További információt a TypeScript fordítóbeállítási kézikönyvében talál.

Fő fogalmak

  • Skála: Hozzon létre több fogyasztót, és mindegyik felhasználó átveszi az olvasási tulajdonjogot néhány Event Hubs-partícióból.

  • Terheléselosztás: A terheléselosztást támogató alkalmazások egy vagy több olyan példányból állnak, amelyek egy vagy több olyan példányból EventHubConsumerClient állnak, amelyek ugyanabból az eseményközpontból és fogyasztói csoportból származó események felhasználására lettek konfigurálva, és ugyanabból CheckpointStorea csoportból. Kiegyensúlyozza a számítási feladatokat a különböző példányok között a feldolgozandó partíciók elosztásával.

  • Ellenőrzőpont-készítés: Ez egy olyan folyamat, amellyel az olvasók megjelölik vagy véglegesítik pozíciójukat egy partícióesemény-sorozaton belül. Az ellenőrzőpontok használata a felhasználó felelőssége, és partíciónkénti alapon történik a felhasználói csoportban. A felelősség itt azt jelenti, hogy mindegyik felhasználói csoport esetében mindegyik partícióolvasónak nyilván kell tartania aktuális pozícióját az eseménystreamben, és tájékoztathatja a szolgáltatást, amikor az adatstreamet befejezettnek tekinti.

    Ha egy olvasó lecsatlakozik egy partícióról, az újracsatlakozáskor az adott felhasználói csoportban az adott partíció utolsó olvasója által elküldött ellenőrzőpontnál kezdi az olvasást. Amikor az olvasó csatlakozik, átadja az eltolást az eseményközpontnak, és megadja, hogy hol kezdjen olvasást. Az ellenőrzőpontok használatával az alárendelt alkalmazások így megjelölhetik az eseményeket „befejezettként”, valamint biztosítható a rugalmasság a különböző gépeken futó olvasók közötti feladatátvétel esetén. Lehetséges visszatérni a régebbi adatokhoz egy alacsonyabb értékű eltolás megadásával az ellenőrzőpontok használata során. Ezzel a mechanizmussal az ellenőrzőpontok használata rugalmasságot biztosít feladatátvétel esetén, és lehetővé teszi az eseménystream visszajátszását.

    A BlobCheckpointStore egy osztály, amely az EventHubConsumerClient által megkövetelt fő metódusokat valósítja meg a terheléselosztáshoz és az ellenőrzőpontok frissítéséhez.

Példák

Create a CheckpointStore using Azure Blob Storage

Hozzon létre egy CheckpointStorekódrészletet az alábbi kódrészlet használatával. Meg kell adnia a kapcsolati karakterlánc a tárfióknak.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Ellenőrzőpont-események az Azure Blob Storage használatával

A Azure Blob Storage használatával fogadott ellenőrzőpont-események ellenőrzéséhez át kell adnia egy, a SubscriptionEventHandlers felülettel kompatibilis objektumot a updateCheckpoint() metódus meghívásához szükséges kóddal együtt.

Ebben a példában a SubscriptionEventHandlerst implementálja, SubscriptionHandlers és kezeli az ellenőrzőpont-ellenőrzést is.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Hibaelhárítás

Naplók engedélyezése

A környezeti változót a AZURE_LOG_LEVEL következő értékek egyikére állíthatja, hogy engedélyezze a naplózást a következőre stderr:

  • részletes
  • Info
  • figyelmeztetés
  • error

A naplószintet programozott módon is beállíthatja, ha importálja a @azure/logger csomagot, és meghívja a setLogLevel függvényt a naplószint egyik értékével.

Ha a naplószintet programozott módon vagy a AZURE_LOG_LEVEL környezeti változón keresztül állítja be, a rendszer minden olyan naplót kibocsát, amely a választottnál egyenlő vagy annál kisebb naplószinttel van megírva. Ha például a naplószintet értékre infoállítja, a szintekhez warning írt és error szintén kibocsátott naplókat. Ez az SDK a TypeScripthez készült Azure SDK irányelveit követi annak meghatározásakor, hogy melyik szintre kell bejelentkezni.

Másik lehetőségként beállíthatja a DEBUG környezeti változót, hogy naplókat kapjon a kódtár használatakor. Ez akkor lehet hasznos, ha naplókat szeretne kibocsátani a függőségekből rhea-promise is rhea .

Megjegyzés: AZURE_LOG_LEVEL, ha be van állítva, elsőbbséget élvez a HIBAKERESÉS beállításnál. Ne adjon meg kódtárakat azure a DEBUG használatával, ha AZURE_LOG_LEVEL vagy a setLogLevel hívását is megadja.

A kódtár használatakor a következő környezeti változót állíthatja be a hibakeresési naplók lekéréséhez.

  • Csak információszintű hibakeresési naplók lekérése az EventHubs Checkpointstore-blobból.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Naplózás fájlba

  • Engedélyezze a naplózást a fent látható módon, majd futtassa a tesztszkriptet az alábbiak szerint:

    • A tesztszkriptből származó naplózási utasítások az oldalra, out.log a naplózási utasítások pedig az sdk-ból a következőre kerülnek debug.log: .

      node your-test-script.js > out.log 2>debug.log
      
    • A tesztszkriptből és az sdk-ből származó naplózási utasítások ugyanahhoz a fájlhoz out.log kerülnek az stderr és az stdout (&1) átirányításával, majd az stdout átirányítása egy fájlba:

      node your-test-script.js >out.log 2>&1
      
    • A tesztszkriptből és az sdk-ből származó utasítások ugyanarra a fájlra out.logkerülnek.

      node your-test-script.js &> out.log
      

Következő lépések

A részletes példaért tekintse meg a mintakönyvtárat .

Közreműködés

Ha hozzá szeretne járulni ehhez a kódtárhoz, olvassa el a közreműködői útmutatót , amelyből többet is megtudhat a kód buildeléséhez és teszteléséhez.

Megjelenések