Share via


Azure Event Hubs klientbibliotek för JavaScript – version 5.11.4

Azure Event Hubs är en mycket skalbar publiceringsprenumereringstjänst som kan mata in miljontals händelser per sekund och strömma dem till flera konsumenter. På så sätt kan du bearbeta och analysera de enorma mängder data som produceras av dina anslutna enheter och program. Om du vill veta mer om Azure Event Hubs kanske du vill granska: Vad är Event Hubs?

Med Azure Event Hubs klientbibliotek kan du skicka och ta emot händelser i Node.js program.

Nyckellänkar:

Obs! Om du använder version 2.1.0 eller senare och vill migrera till den senaste versionen av det här paketet kan du titta på vår migreringsguide för att flytta från EventHubs V2 till EventHubs V5

Exempel på v2 och dokumentation finns fortfarande här:

Källkod för v2.1.0 | Paket för v2.1.0 (npm) | Exempel för v2.1.0

Komma igång

Installera paketet

Installera Azure Event Hubs-klientbiblioteket med npm

npm install @azure/event-hubs

Miljöer som stöds för närvarande

Mer information finns i vår supportprincip .

Förutsättningar

Konfigurera TypeScript

TypeScript-användare måste ha definitioner av nodtyp installerade:

npm install @types/node

Du måste också aktivera compilerOptions.allowSyntheticDefaultImports i din tsconfig.json. Observera att om du har aktiverat compilerOptions.esModuleInteropallowSyntheticDefaultImports är aktiverad som standard. Mer information finns i TypeScripts handbok för kompilatoralternativ .

JavaScript-paket

Om du vill använda det här klientbiblioteket i webbläsaren måste du först använda en bundler. Mer information om hur du gör detta finns i vår paketeringsdokumentation.

Utöver vad som beskrivs där behöver det här biblioteket även ytterligare polyfiller för följande inbyggda NodeJS-kärnmoduler för att fungera korrekt i webbläsaren:

  • buffer
  • os
  • path
  • process

Paketera med Webpack

Om du använder Webpack v5 kan du installera följande dev-beroenden

  • npm install --save-dev os-browserify path-browserify

lägg sedan till följande i din webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Paketering med sammanslagning

Om du använder Sammanslagningspaket installerar du följande dev-beroenden

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

Ta sedan med följande i rollup.config.js

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Mer information om hur du använder polyfiller finns i dokumentationen för ditt favoritpaket.

React Native support

I likhet med webbläsare stöder React Native inte vissa JavaScript-API:er som används av det här SDK-biblioteket, så du måste ange polyfiller för dem. Mer information finns i exemplet meddelande React Native med Expo.

Autentisera klienten

Interaktion med Event Hubs börjar med antingen en instans av klassen EventHubConsumerClient eller en instans av klassen EventHubProducerClient . Det finns konstruktoröverlagringar som stöder olika sätt att instansiera dessa klasser enligt nedan:

Använda anslutningssträng för Event Hubs-namnområdet

En av konstruktorns överlagringar tar en anslutningssträng av formuläret Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; och entitetsnamnet till din Event Hub-instans. Du kan skapa en konsumentgrupp och hämta anslutningssträng samt entitetsnamnet från Azure Portal.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Använda anslutningssträng för principer på händelsehubben

En annan konstruktoröverlagring tar anslutningssträng som motsvarar den princip för delad åtkomst som du har definierat direkt på Event Hub-instansen (och inte Event Hubs-namnområdet). Den här anslutningssträng kommer att vara i formuläret Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. Den viktigaste skillnaden i anslutningssträng format från den tidigare konstruktorns överlagring är ;EntityPath=my-event-hub-name.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Använda Event Hubs-namnområdet och Azure Identity

Den här konstruktorns överlagring tar värdnamnet och entitetsnamnet för din Event Hub-instans och autentiseringsuppgifter som implementerar TokenCredential-gränssnittet. På så sätt kan du autentisera med ett Azure Active Directory-huvudnamn. Det finns implementeringar av TokenCredential gränssnittet i @azure/identitetspaketet . Värdnamnet har formatet <yournamespace>.servicebus.windows.net. När du använder Azure Active Directory måste ditt huvudnamn tilldelas en roll som ger åtkomst till Event Hubs, till exempel rollen Azure Event Hubs dataägare. Mer information om hur du använder Azure Active Directory-auktorisering med Event Hubs finns i den associerade dokumentationen.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Viktiga begrepp

  • En Event Hub-producent är en källa till telemetridata, diagnostikinformation, användningsloggar eller andra loggdata, som en del av en inbäddad enhetslösning, ett mobilt enhetsprogram, en speltitel som körs på en konsol eller annan enhet, en klient- eller serverbaserad affärslösning eller en webbplats.

  • En Event Hub-konsument hämtar sådan information från händelsehubben och bearbetar den. Bearbetningen kan omfatta aggregering, komplex beräkning och filtrering. Bearbetningen kan också omfatta distribution eller lagring av informationen på ett obehandlat eller transformerat sätt. Event Hub-konsumenter är ofta robusta och storskaliga plattformsinfrastrukturdelar med inbyggda analysfunktioner som Azure Stream Analytics, Apache Spark eller Apache Storm.

  • En partition är en ordnad sekvens av händelser som hålls i en händelsehubb. Partitioner är ett sätt att organisera data som är associerade med den parallellitet som krävs av händelsekonsumenter. Azure Event Hubs tillhandahåller meddelandeströmning via ett partitionerat konsumentmönster där varje konsument endast läser en viss delmängd, eller partition, av meddelandeströmmen. När nya händelser anländer läggs de till i slutet av denna sekvens. Antalet partitioner anges när en händelsehubb skapas och kan inte ändras.

  • En konsumentgrupp är en vy över en hel händelsehubb. Konsumentgrupper gör det möjligt för flera förbrukande program att var och en har en separat vy över händelseströmmen och att läsa dataströmmen oberoende av varandra i sin egen takt och från sin egen position. Det kan finnas högst 5 samtidiga läsare på en partition per konsumentgrupp. Vi rekommenderar dock att det bara finns en aktiv konsument för en viss partition och parkoppling av konsumentgrupper. Varje aktiv läsare tar emot alla händelser från partitionen. Om det finns flera läsare på samma partition får de duplicerade händelser.

Fler begrepp och djupare diskussion finns i: Event Hubs-funktioner

Vägledning kring återförsök

Och EventHubConsumerClientEventHubProducerClient accepterar options var du kan ange retryOptions som gör att du kan justera hur SDK hanterar tillfälliga fel. Exempel på tillfälliga fel är tillfälliga nätverk eller tjänstproblem.

Återförsök vid användning av händelser

Om ett tillfälligt fel (t.ex. ett tillfälligt nätverksproblem) påträffas när SDK:n tar emot händelser försöker den ta emot händelser igen baserat på de återförsöksalternativ som skickas till EventHubConsumerClient. Om det maximala återförsöket är slut processError anropas funktionen.

Du kan använda återförsöksinställningarna för att styra hur snabbt du får information om tillfälliga problem, till exempel ett problem med nätverksanslutningen. Om du till exempel behöver veta när det finns ett nätverksproblem direkt kan du sänka värdena för maxRetries och retryDelayInMs.

När funktionen har körts processError fortsätter klienten att ta emot händelser från partitionen så länge felet kunde försöka igen. Annars anropar klienten funktionen som tillhandahålls av processClose användaren. Den här funktionen anropas också när du stoppar prenumerationen eller när klienten slutar läsa händelser från den aktuella partitionen på grund av att den hämtas av en annan instans av ditt program som en del av belastningsutjämningen.

Funktionen processClose ger en möjlighet att uppdatera kontrollpunkter om det behövs. När du har kört processCloseanropar klienten (eller vid belastningsutjämning, en klient från en annan instans av ditt program) funktionen som tillhandahålls processInitialize av användaren för att återuppta läsningshändelser från den senast uppdaterade kontrollpunkten för samma partition.

Om du vill sluta försöka läsa händelser måste du anropa close() den subscription som returneras av subscribe metoden.

Exempel

Följande avsnitt innehåller kodfragment som täcker några av de vanliga uppgifterna med hjälp av Azure Event Hubs

Inspektera en händelsehubb

Många Event Hub-åtgärder utförs inom omfånget för en specifik partition. Eftersom partitioner ägs av händelsehubben tilldelas deras namn när de skapas. För att förstå vilka partitioner som är tillgängliga kan du fråga händelsehubben med någon av de två tillgängliga klienterna: EventHubProducerClient eller EventHubConsumerClient

I exemplet nedan använder vi en EventHubProducerClient.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Publicera händelser till en händelsehubb

För att kunna publicera händelser måste du skapa en EventHubProducerClient. Exemplet nedan visar ett sätt att skapa klienten på, men se avsnittet Autentisera klienten för att lära dig andra sätt att instansiera klienten.

Du kan publicera händelser till en specifik partition eller tillåta att Event Hubs-tjänsten bestämmer vilka partitionshändelser som ska publiceras till. Vi rekommenderar att du använder automatisk routning när publiceringen av händelser måste ha hög tillgänglighet eller när händelsedata ska distribueras jämnt mellan partitionerna. I exemplet nedan använder vi automatisk routning.

  • Skapa ett EventDataBatch objekt med createBatch
  • Lägg till händelser i batchen med hjälp av metoden tryAdd . Du kan göra detta tills den maximala batchstorleksgränsen har uppnåtts eller tills du är klar med att lägga till antalet händelser som du har gillat, beroende på vilket som kommer först. Den här metoden returnerar false för att ange att inga fler händelser kan läggas till i batchen på grund av att den maximala batchstorleken har uppnåtts.
  • Skicka batchen med händelser med hjälp av metoden sendBatch .

I exemplet nedan försöker vi skicka 10 händelser till Azure Event Hubs.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Det finns alternativ som du kan skicka i olika steg för att styra processen för att skicka händelser till Azure Event Hubs.

  • Konstruktorn EventHubProducerClient tar en valfri parameter av typen EventHubClientOptions som du kan använda för att ange alternativ som antal återförsök.
  • Metoden createBatch använder en valfri parameter av typen CreateBatchOptions som du kan använda för att speicifiera den maximala batchstorlek som stöds av batchen som skapas.
  • Metoden sendBatch tar en valfri parameter av typen SendBatchOptions som du kan använda för att ange abortSignal för att avbryta den aktuella åtgärden.
  • Om du vill skicka till en specifik partition kan du med en överlagring av sendBatch metoden skicka ID:t för partitionen som händelser ska skickas till. Exemplet Inspektera en händelsehubb ovan visar hur du hämtar de tillgängliga partitions-ID:na.

Obs! När du arbetar med Azure Stream Analytics ska även brödtexten för händelsen som skickas vara ett JSON-objekt. Exempelvis: body: { "message": "Hello World" }

Använda händelser från en händelsehubb

Om du vill använda händelser från en Event Hub-instans behöver du också veta vilken konsumentgrupp du vill rikta in dig på. När du vet detta är du redo att skapa en EventHubConsumerClient. Exemplet nedan visar ett sätt att skapa klienten på, men se avsnittet Autentisera klienten för att lära dig andra sätt att instansiera klienten.

Metoden subscribe på klienten har överbelastningar som i kombination med konstruktorn kan hantera flera olika sätt att använda händelser:

Metoden subscribe tar en valfri parameter av typen SubscriptionOptions som du kan använda för att ange alternativ som maxBatchSize (antal händelser att vänta på) och maxWaitTimeInSeconds (tid att vänta på att maxBatchSize-händelser ska tas emot).

Använda händelser i en enda process

Börja med att skapa en instans av EventHubConsumerClientoch anropa sedan metoden på den subscribe() för att börja använda händelser.

Metoden subscribe tar återanrop för att bearbeta händelser när de tas emot från Azure Event Hubs. Om du vill sluta ta emot händelser kan du anropa close() på objektet som returneras av subscribe() metoden .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Använda händelser med belastningsutjämning över flera processer

Azure Event Hubs kan hantera miljontals händelser per sekund. Om du vill skala bearbetningsprogrammet kan du köra flera instanser av ditt program och låta det balansera belastningen sinsemellan.

Börja med att skapa en instans av med hjälp av EventHubConsumerClient en av konstruktorns överlagringar som tar en CheckpointStore, och anropa subscribe() sedan metoden för att börja använda händelser. Kontrollpunktsarkivet gör det möjligt för prenumeranterna i en konsumentgrupp att samordna bearbetningen mellan flera instanser av ditt program.

I det här exemplet använder BlobCheckpointStore vi från @azure/eventhubs-checkpointstore-blob paketet som implementerar nödvändiga läsningar/skrivningar till ett beständigt arkiv med hjälp av Azure Blob Storage.

Metoden subscribe tar återanrop för att bearbeta händelser när de tas emot från Azure Event Hubs. Om du vill sluta ta emot händelser kan du anropa close() på objektet som returneras av subscribe() metoden .

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Mer information finns i Balansera partitionsbelastning över flera instanser av ditt program .

Använda händelser från en enda partition

Börja med att skapa en instans av EventHubConsumerClientoch anropa sedan metoden på den subscribe() för att börja använda händelser. Skicka ID:t för partitionen som du vill rikta till subscribe() metoden för att endast använda från partitionen.

I exemplet nedan använder vi den första partitionen.

Metoden subscribe tar återanrop för att bearbeta händelser när de tas emot från Azure Event Hubs. Om du vill sluta ta emot händelser kan du anropa close() på objektet som returneras av subscribe() metoden .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Använda EventHubConsumerClient för att arbeta med IotHub

Du kan även använda EventHubConsumerClient för att arbeta med IotHub. Detta är användbart för att ta emot telemetridata för IotHub från den länkade EventHub. Den associerade anslutningssträng kommer inte att ha skicka anspråk, vilket innebär att det inte går att skicka händelser.

  • Observera att anslutningssträng måste vara för en Event Hub-kompatibel slutpunkt (t.ex. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Felsökning

AMQP-beroenden

Event Hubs-biblioteket är beroende av rhea-promise-biblioteket för att hantera anslutningar, skicka och ta emot händelser via AMQP-protokollet .

Loggning

Du kan ange AZURE_LOG_LEVEL miljövariabeln för att aktivera loggning till stderr:

export AZURE_LOG_LEVEL=verbose

Mer detaljerade anvisningar om hur du aktiverar loggar finns i dokumentationen om @azure-/loggningspaket.

Du kan också ange DEBUG miljövariabeln för att hämta loggar när du använder det här biblioteket. Detta kan vara användbart om du även vill generera loggar från beroenden rhea-promise och rhea även.

Observera: AZURE_LOG_LEVEL, om det anges, har företräde framför FELSÖKNING. Ange inte några azure bibliotek via DEBUG när du även anger AZURE_LOG_LEVEL eller anropar setLogLevel.

  • Hämtar endast felsökningsloggar på informationsnivå från Event Hubs SDK.
export DEBUG=azure:*:info
  • Hämta felsökningsloggar från Event Hubs SDK och biblioteket på protokollnivå.
export DEBUG=azure*,rhea*
  • Om du inte är intresserad av att visa rådata för händelser (som förbrukar en stor mängd konsol-/diskutrymme) kan du ange DEBUG miljövariabeln på följande sätt:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Om du bara är intresserad av fel och SDK-varningar kan du ange DEBUG miljövariabeln på följande sätt:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Nästa steg

Mer exempelkod

Ta en titt på exempelkatalogen för detaljerade exempel på hur du använder det här biblioteket för att skicka och ta emot händelser till och från Event Hubs.

Bidra

Om du vill bidra till det här biblioteket kan du läsa bidragsguiden om du vill veta mer om hur du skapar och testar koden.

Visningar