Sdílet prostřednictvím


Azure Event Hubs klientské knihovny pro JavaScript – verze 5.12.0

Azure Event Hubs je vysoce škálovatelná služba publikování a odběru, která může ingestovat miliony událostí za sekundu a streamovat je více příjemcům. To vám umožní zpracovávat a analyzovat obrovské objemy dat generovaných připojenými zařízeními a aplikacemi. Pokud se chcete o Azure Event Hubs dozvědět více, můžete si projít téma Co je Event Hubs?

Klientská knihovna Azure Event Hubs umožňuje odesílat a přijímat události v aplikaci Node.js.

Klíčové odkazy:

POZNÁMKA: Pokud používáte verzi 2.1.0 nebo nižší a chcete migrovat na nejnovější verzi tohoto balíčku, projděte si průvodce migrací a přejděte z EventHubs v2 na EventHubs v5.

Ukázky pro verzi 2 a dokumentace jsou stále k dispozici tady:

Zdrojový kód pro verzi 2.1.0 | Balíček pro verzi 2.1.0 (npm) | Ukázky pro verzi 2.1.0

Začínáme

Instalace balíčku

Instalace klientské knihovny Azure Event Hubs pomocí npm

npm install @azure/event-hubs

Aktuálně podporovaná prostředí

  • LtS verze Node.js
  • Nejnovější verze prohlížečů Safari, Chrome, Edge a Firefox.

Další podrobnosti najdete v našich zásadách podpory .

Požadavky

Konfigurace TypeScriptu

Uživatelé TypeScriptu musí mít nainstalované definice typů uzlů:

npm install @types/node

Musíte také povolit compilerOptions.allowSyntheticDefaultImports v tsconfig.json. Všimněte si, že pokud jste povolili compilerOptions.esModuleInterop, allowSyntheticDefaultImports je ve výchozím nastavení povolená. Další informace najdete v příručce k možnostem kompilátoru TypeScriptu .

JavaScript Bundle

Pokud chcete tuto klientskou knihovnu používat v prohlížeči, musíte nejprve použít bundler. Podrobnosti o tom, jak to udělat, najdete v naší dokumentaci k sdružování.

Kromě toho, co je zde popsáno, potřebuje tato knihovna také další polyfills pro následující předdefinované moduly jádra NodeJS, aby mohla správně fungovat v prohlížečích:

  • buffer
  • os
  • path
  • process

Sdružování s Webpackem

Pokud používáte Webpack v5, můžete nainstalovat následující vývojové závislosti.

  • npm install --save-dev os-browserify path-browserify

a pak do webpack.config.js přidejte následující

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Sdružování s kumulativní aktualizací

Pokud používáte nástroj Rollup Bundler, nainstalujte následující vývojové závislosti.

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

Pak do rollup.config.js uveďte následující:

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Další informace o používání polyfillů najdete v dokumentaci k vašemu oblíbenému balíkovači.

podpora React Native

Podobně jako u prohlížečů React Native nepodporuje některé javascriptové rozhraní API používané touto knihovnou sady SDK, takže pro ně musíte poskytnout polyfills. Další podrobnosti najdete v ukázce React Native zasílání zpráv s expo.

Ověření klienta

Interakce se službou Event Hubs začíná instancí třídy EventHubConsumerClient nebo instancí třídy EventHubProducerClient . Existují přetížení konstruktoru pro podporu různých způsobů vytváření instancí těchto tříd, jak je znázorněno níže:

Použití připojovací řetězec pro obor názvů služby Event Hubs

Jedno z přetížení konstruktoru přebírá připojovací řetězec formuláře Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; a názvu entity do vaší instance centra událostí. Můžete vytvořit skupinu příjemců a získat připojovací řetězec a název entity z Azure Portal.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Použití připojovací řetězec pro zásady v centru událostí

Další přetížení konstruktoru přebírá připojovací řetězec odpovídající zásadám sdíleného přístupu, které jste definovali přímo v instanci centra událostí (a ne v oboru názvů služby Event Hubs). Tato připojovací řetězec bude mít tvar Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. Hlavní rozdíl ve formátu připojovací řetězec oproti předchozímu přetížení konstruktoru je .;EntityPath=my-event-hub-name

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Použití oboru názvů služby Event Hubs a identity Azure

Toto přetížení konstruktoru přebírá název hostitele a název entity vaší instance centra událostí a přihlašovací údaje, které implementují rozhraní TokenCredential. To vám umožní ověřit pomocí objektu zabezpečení Služby Azure Active Directory. V balíčku @azure/identity jsou k dispozici implementace TokenCredential rozhraní. Název hostitele je ve formátu <yournamespace>.servicebus.windows.net. Pokud používáte Azure Active Directory, musí mít objekt zabezpečení přiřazenou roli, která umožňuje přístup ke službě Event Hubs, například roli vlastníka dat Azure Event Hubs. Další informace o použití autorizace Azure Active Directory se službou Event Hubs najdete v související dokumentaci.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Klíčové koncepty

  • Producent centra událostí je zdrojem telemetrických dat, diagnostických informací, protokolů využití nebo jiných dat protokolů jako součást řešení vložených zařízení, aplikace pro mobilní zařízení, herního titulu běžícího na konzoli nebo jiném zařízení, některého klientského nebo serverového obchodního řešení nebo webu.

  • Příjemce centra událostí takové informace z centra událostí převezme a zpracuje je. Zpracování může zahrnovat agregaci, komplexní výpočty a filtrování. Zpracování může také zahrnovat distribuci nebo ukládání informací nezpracovaným nebo transformovaným způsobem. Příjemci centra událostí jsou často robustní a vysoce škálovatelné součásti infrastruktury platformy s integrovanými analytickými funkcemi, jako jsou Azure Stream Analytics, Apache Spark nebo Apache Storm.

  • Oddíl je seřazená posloupnost událostí, která se nachází v centru událostí. Oddíly jsou prostředky organizace dat spojené s paralelismem vyžadovaným příjemci událostí. Azure Event Hubs poskytuje streamování zpráv prostřednictvím modelu rozděleného příjemce, ve kterém každý příjemce čte pouze určitou podmnožinu neboli oddíl streamu zpráv. Události, které nově přichází, se zařazují na konec této posloupnosti. Počet oddílů je zadaný v okamžiku vytvoření centra událostí a nelze ho změnit.

  • Skupina příjemců je zobrazení celého centra událostí. Skupiny příjemců umožňují, aby každá z nich měla samostatné zobrazení streamu událostí a mohla stream číst nezávisle vlastním tempem a z vlastní pozice. Na oddílu na skupinu příjemců může být maximálně 5 souběžných čtenářů. Doporučuje se však, aby pro daný oddíl a párování skupin příjemců existoval pouze jeden aktivní příjemce. Každý aktivní čtenář obdrží všechny události ze svého oddílu; Pokud je ve stejném oddílu více čtenářů, obdrží duplicitní události.

Další koncepty a hlubší diskuzi najdete v tématu Funkce služby Event Hubs.

Pokyny k opakovaným pokusům

EventHubProducerClient A EventHubConsumerClient přijměteoptions, kde můžete nastavit retryOptions nastavení, které vám umožní ladit způsob zpracování přechodných chyb v sadě SDK. Mezi přechodné chyby patří dočasné problémy se sítí nebo službami.

Opakování při využívání událostí

Pokud dojde k přechodné chybě (např. k dočasnému problému se sítí), když sada SDK přijímá události, pokusí se znovu přijmout události na základě možností opakování předaných do EventHubConsumerClient. Pokud dojde k vyčerpání maximálního počtu pokusů o opakování, processError vyvolá se funkce.

Pomocí nastavení opakování můžete řídit, jak rychle budete informováni o dočasných problémech, jako je problém s připojením k síti. Pokud například potřebujete okamžitě zjistit, kdy dojde k problému se sítí, můžete snížit hodnoty pro maxRetries a retryDelayInMs.

Po spuštění processError funkce bude klient dál přijímat události z oddílu, dokud byla chyba opakovatelná. V opačném případě klient vyvolá funkci poskytovanou processClose uživatelem. Tato funkce se také vyvolá, když buď zastavíte předplatné, nebo když klient přestane číst události z aktuálního oddílu kvůli tomu, že je v rámci vyrovnávání zatížení přebírá jiná instance vaší aplikace.

Funkce processClose umožňuje v případě potřeby aktualizovat kontrolní body. Po spuštění processClosevyvolá klient (nebo v případě vyrovnávání zatížení klient z jiné instance aplikace) funkci poskytovanou processInitialize uživatelem, která obnoví čtení událostí z posledního aktualizovaného kontrolního bodu pro stejný oddíl.

Pokud chcete ukončit pokusy o čtení událostí, musíte volat close() na subscription vrácené metodou subscribe .

Příklady

Následující části obsahují fragmenty kódu, které pokrývají některé běžné úlohy pomocí Azure Event Hubs

Kontrola centra událostí

Mnoho operací centra událostí probíhá v rámci konkrétního oddílu. Vzhledem k tomu, že oddíly vlastní centrum událostí, jejich názvy se přiřazují při vytváření. Pokud chcete zjistit, jaké oddíly jsou k dispozici, můžete dotazovat centrum událostí pomocí některého ze dvou dostupných klientů: EventHubProducerClient nebo EventHubConsumerClient

V následujícím příkladu používáme .EventHubProducerClient

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Publikování událostí do centra událostí

Aby bylo možné publikovat události, budete muset vytvořit EventHubProducerClient. I když následující příklad ukazuje jeden ze způsobů, jak vytvořit klienta, přečtěte si část Ověření klienta , kde se dozvíte o dalších způsobech vytvoření instance klienta.

Události můžete publikovat do konkrétního oddílu nebo povolit službě Event Hubs, aby se rozhodla o tom, do kterého oddílu se mají publikovat události. Automatické směrování se doporučuje používat v případě, že publikování událostí musí být vysoce dostupné nebo když mají být data událostí rovnoměrně distribuována mezi oddíly. V následujícím příkladu využijeme výhod automatického směrování.

  • Create objektu EventDataBatch pomocí metody createBatch
  • Přidejte události do dávky pomocí metody tryAdd . Můžete to udělat, dokud nedosáhnete maximálního limitu velikosti dávky nebo dokud nedokončíte přidání počtu událostí, které se vám líbí, podle toho, co nastane dřív. Tato metoda by se vrátila false , aby indikovala, že kvůli dosažení maximální velikosti dávky není možné do dávky přidat žádné další události.
  • Odešlete dávku událostí pomocí metody sendBatch .

V následujícím příkladu se pokusíme odeslat 10 událostí do Azure Event Hubs.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Existují možnosti, které můžete předat v různých fázích a řídit proces odesílání událostí do Azure Event Hubs.

  • Konstruktor EventHubProducerClient přebírá volitelný parametr typu EventHubClientOptions , který můžete použít k určení možností, jako je počet opakovaných pokusů.
  • Metoda createBatch přebírá volitelný parametr typu CreateBatchOptions , který můžete použít k určení maximální velikosti dávky podporované vytvářenou dávkou.
  • Metoda sendBatch přebírá volitelný parametr typu SendBatchOptions , který můžete použít k určení abortSignal zrušení aktuální operace.
  • V případě, že chcete odeslat do konkrétního sendBatch oddílu, přetížení metody umožňuje předat ID oddílu, do kterého chcete odesílat události. Výše uvedený příklad Kontroly centra událostí ukazuje, jak načíst ID dostupných oddílů.

Poznámka: Při práci s Azure Stream Analytics by text odesílané události měl být také objekt JSON. Příklad: body: { "message": "Hello World" }

Využívání událostí z centra událostí

Pokud chcete využívat události z instance centra událostí, musíte také vědět, na kterou skupinu uživatelů chcete cílit. Jakmile to víte, jste připraveni vytvořit EventHubConsumerClient. I když následující příklad ukazuje jeden ze způsobů, jak vytvořit klienta, přečtěte si část Ověření klienta , kde se dozvíte o dalších způsobech vytvoření instance klienta.

Metoda subscribe na klientovi má přetížení, které v kombinaci s konstruktorem může zajišťovat několik způsobů využití událostí:

Metoda subscribe přebírá volitelný parametr typu SubscriptionOptions , který můžete použít k určení možností, jako je maxBatchSize (počet událostí, na které se mají čekat) a maxWaitTimeInSeconds (doba čekání na přijetí událostí maxBatchSize).

Využívání událostí v jednom procesu

Začněte vytvořením instance objektu EventHubConsumerClienta voláním subscribe() metody na této instanci začněte využívat události.

Metoda subscribe přebírá zpětná volání ke zpracování událostí při jejich přijetí z Azure Event Hubs. Pokud chcete zastavit příjem událostí, můžete volat close() pro objekt vrácený metodou subscribe() .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Využívání událostí s vyrovnáváním zatížení napříč několika procesy

Azure Event Hubs dokáže zvládnout miliony událostí za sekundu. Pokud chcete škálovat aplikaci pro zpracování, můžete spustit několik instancí aplikace a nechat ji vyrovnávat zatížení mezi sebou.

Začněte vytvořením instance objektu EventHubConsumerClient pomocí jednoho z přetížení konstruktoru, které přebírají CheckpointStore, a pak zavolejte metodu subscribe() , která začne přijímat události. Úložiště kontrolních bodů umožní odběratelům v rámci skupiny příjemců koordinovat zpracování mezi několika instancemi vaší aplikace.

V tomto příkladu BlobCheckpointStore použijeme z @azure/eventhubs-checkpointstore-blob balíčku , který implementuje požadované čtení a zápisy do trvalého úložiště pomocí Azure Blob Storage.

Metoda subscribe přebírá zpětná volání ke zpracování událostí při jejich přijetí z Azure Event Hubs. Pokud chcete zastavit příjem událostí, můžete volat close() pro objekt vrácený metodou subscribe() .

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Další informace najdete v tématu Vyrovnávání zatížení oddílů napříč několika instancemi vaší aplikace .

Využívání událostí z jednoho oddílu

Začněte vytvořením instance objektu EventHubConsumerClienta voláním subscribe() metody na této instanci začněte využívat události. Předejte ID oddílu, na který chcete cílit, do subscribe() metody, která bude využívat pouze z tohoto oddílu.

V následujícím příkladu používáme první oddíl.

Metoda subscribe přebírá zpětná volání ke zpracování událostí při jejich přijetí z Azure Event Hubs. Pokud chcete zastavit příjem událostí, můžete volat close() pro objekt vrácený metodou subscribe() .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Použití EventHubConsumerClient pro práci s IotHubem

Můžete také použít EventHubConsumerClient k práci s IotHubem. To je užitečné pro příjem telemetrických dat IotHubu z propojeného EventHubu. Přidružená připojovací řetězec nebude mít odeslané deklarace identity, takže odesílání událostí není možné.

  • Všimněte si, že připojovací řetězec musí být pro koncový bod kompatibilní s centrem událostí (např. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Poradce při potížích

Závislosti AMQP

Knihovna Event Hubs závisí na knihovně rhea-promise pro správu připojení, odesílání a přijímání událostí přes protokol AMQP .

protokolování

Proměnnou AZURE_LOG_LEVEL prostředí můžete nastavit tak, aby povolte protokolování na stderrhodnotu :

export AZURE_LOG_LEVEL=verbose

Podrobnější pokyny k povolení protokolů najdete v dokumentaci k balíčkům @azure/protokolovacího nástroje.

Případně můžete nastavit proměnnou prostředí pro DEBUG získání protokolů při použití této knihovny. To může být užitečné, pokud chcete také generovat protokoly ze závislostí rhea-promise a rhea také.

Poznámka: AZURE_LOG_LEVEL, pokud je nastavená, má přednost před laděním. Při zadávání AZURE_LOG_LEVEL nebo volání setLogLevel nezadávejte žádné azure knihovny prostřednictvím funkce DEBUG.

  • Získání pouze protokolů ladění na úrovni informací ze sady Event Hubs SDK
export DEBUG=azure:*:info
  • Získání protokolů ladění ze sady Event Hubs SDK a knihovny na úrovni protokolu
export DEBUG=azure*,rhea*
  • Pokud nemáte zájem o zobrazení nezpracovaných dat událostí (které spotřebovávají velké množství místa na konzole nebo na disku), můžete proměnnou DEBUG prostředí nastavit následujícím způsobem:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Pokud vás zajímají jenom chyby a upozornění sady SDK, můžete proměnnou DEBUG prostředí nastavit následujícím způsobem:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Další kroky

Další vzorový kód

Podrobné příklady použití této knihovny k odesílání a přijímání událostí do a ze služby Event Hubs najdete v adresáři samples.

Přispívání

Pokud chcete přispívat do této knihovny, přečtěte si příručku pro přispívání , kde najdete další informace o tom, jak sestavit a otestovat kód.

Imprese