Compartir a través de


biblioteca cliente de Azure Event Hubs para JavaScript: versión 5.12.0

Azure Event Hubs es un servicio de publicación y suscripción altamente escalable que puede ingerir millones de eventos por segundo y transmitirlos a varios consumidores. Esto le permite procesar y analizar las grandes cantidades de datos generados por los dispositivos y aplicaciones conectados. Si desea obtener más información sobre Azure Event Hubs, puede que desee revisar: ¿Qué es Event Hubs?

La biblioteca cliente de Azure Event Hubs permite enviar y recibir eventos en la aplicación Node.js.

Vínculos principales:

NOTA: Si usa la versión 2.1.0 o inferior y desea migrar a la versión más reciente de este paquete, consulte nuestra guía de migración para pasar de EventHubs V2 a EventHubs V5.

Los ejemplos de la versión 2 y la documentación siguen estando disponibles aquí:

Código fuente de v2.1.0 | Paquete para v2.1.0 (npm) | Ejemplos de v2.1.0

Introducción

Instalar el paquete

Instalación de la biblioteca cliente de Azure Event Hubs mediante npm

npm install @azure/event-hubs

Entornos admitidos actualmente

Para más información, consulte la directiva de compatibilidad.

Requisitos previos

Configurar TypeScript

Los usuarios de TypeScript deben tener instaladas definiciones de tipo de nodo:

npm install @types/node

También debe habilitar compilerOptions.allowSyntheticDefaultImports en la tsconfig.json. Tenga en cuenta que si ha habilitado , allowSyntheticDefaultImports está habilitado compilerOptions.esModuleInteropde forma predeterminada. Consulte el manual de opciones del compilador de TypeScript para obtener más información.

Paquete de JavaScript

Para usar esta biblioteca cliente en el explorador, primero debe usar un empaquetador. Para más información sobre cómo hacerlo, consulte nuestra documentación de agrupación.

Además de lo que se describe allí, esta biblioteca también necesita polirrellenes adicionales para los siguientes módulos integrados principales de NodeJS para funcionar correctamente en los exploradores:

  • buffer
  • os
  • path
  • process

Agrupación con Webpack

Si usa Webpack v5, puede instalar las siguientes dependencias de desarrollo.

  • npm install --save-dev os-browserify path-browserify

a continuación, agregue lo siguiente al webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

Agrupación con acumulación

Si usa el empaquetador acumulativo, instale las siguientes dependencias de desarrollo.

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

A continuación, incluya lo siguiente en el rollup.config.js

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

Consulte la documentación de su empaquetador favorito para obtener más información sobre el uso de polyfills.

compatibilidad con React Native

De forma similar a los exploradores, React Native no admite algunas API de JavaScript que usa esta biblioteca del SDK, por lo que debe proporcionar polirrellenes para ellos. Consulte el ejemplo de mensajería React Native con Expo para obtener más detalles.

Autenticar el cliente

La interacción con Event Hubs comienza con una instancia de la clase EventHubConsumerClient o una instancia de la clase EventHubProducerClient . Hay sobrecargas de constructor para admitir diferentes formas de crear instancias de estas clases, como se muestra a continuación:

Uso de cadena de conexión para el espacio de nombres de Event Hubs

Una de las sobrecargas del constructor toma una cadena de conexión del formulario Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; y el nombre de entidad en la instancia del centro de eventos. Puede crear un grupo de consumidores y obtener el cadena de conexión, así como el nombre de entidad del Azure Portal.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

Uso de cadena de conexión para la directiva en el centro de eventos

Otra sobrecarga del constructor toma el cadena de conexión correspondiente a la directiva de acceso compartido que ha definido directamente en la instancia del centro de eventos (y no en el espacio de nombres de Event Hubs). Este cadena de conexión tendrá el formato Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name. La diferencia clave en el formato cadena de conexión de la sobrecarga del constructor anterior es ;EntityPath=my-event-hub-name.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Uso del espacio de nombres de Event Hubs y la identidad de Azure

Esta sobrecarga del constructor toma el nombre de host y el nombre de entidad de la instancia del centro de eventos y la credencial que implementa la interfaz TokenCredential. Esto le permite autenticarse mediante una entidad de seguridad de Azure Active Directory. Hay implementaciones de la TokenCredential interfaz disponibles en el paquete de @azure/identidad . El nombre de host tiene el formato <yournamespace>.servicebus.windows.net. Al usar Azure Active Directory, se debe asignar a la entidad de seguridad un rol que permita el acceso a Event Hubs, como el rol propietario de datos Azure Event Hubs. Para más información sobre el uso de la autorización de Azure Active Directory con Event Hubs, consulte la documentación asociada.

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

Conceptos clave

  • Un productor del centro de eventos es un origen de datos de telemetría, información de diagnóstico, registros de uso u otros datos de registro, como parte de una solución de dispositivo insertada, una aplicación de dispositivo móvil, un título de juego que se ejecuta en una consola u otro dispositivo, alguna solución empresarial basada en cliente o servidor o un sitio web.

  • Un consumidor de Event Hubs recoge dicha información del centro de eventos y la procesa. El procesamiento puede implicar agregación, cálculo complejo y filtrado. El procesamiento también puede implicar la distribución o el almacenamiento de la información sin procesar o transformada. Los consumidores de centros de eventos suelen ser componentes sólidos y a gran escala de la infraestructura de la plataforma con funcionalidades de análisis integradas, como Azure Stream Analytics, Apache Spark o Apache Storm.

  • Una partición es una secuencia ordenada de eventos que se mantiene en un centro de eventos. Las particiones son un medio de organización de datos asociado al paralelismo requerido por los consumidores de eventos. Azure Event Hubs proporciona streaming de mensajes a través de un patrón de consumidor con particiones en el que cada consumidor solo lee un subconjunto específico, o partición, de la secuencia de mensajes. A medida que llegan eventos más recientes, se agregan al final de esta secuencia. El número de particiones se especifica en el momento en que se crea un centro de eventos y no se puede modificar.

  • Un grupo de consumidores es una vista de un centro de eventos completo. Los grupos de consumidores habilitan a varias aplicaciones de consumo para que cada una de ellas tenga una vista independiente de la secuencia de eventos, y para que lea la secuencia de manera independiente a su propio ritmo y desde su propia ubicación. Puede haber como máximo cinco lectores simultáneos en una partición por grupo de consumidores; sin embargo, se recomienda que solo haya un consumidor activo para un emparejamiento determinado de partición y grupo de consumidores. Cada lector activo recibe todos los eventos de su partición; Si hay varios lectores en la misma partición, recibirán eventos duplicados.

Para más conceptos y una explicación más detallada, consulte: Características de Event Hubs.

Instrucciones sobre los reintentos

y EventHubConsumerClientEventHubProducerClient acepta options dónde puede establecer que le permite ajustar cómo retryOptions controla el SDK los errores transitorios. Entre los ejemplos de errores transitorios se incluyen problemas temporales de red o servicio.

Reintentos al consumir eventos

Si se produce un error transitorio (por ejemplo, un problema de red temporal) mientras el SDK recibe eventos, reintentará la recepción de eventos en función de las opciones de reintento pasadas a EventHubConsumerClient. Si se agotan los intentos de reintento máximos, se invocará la processError función .

Puede usar la configuración de reintento para controlar la rapidez con la que se le informa sobre problemas temporales, como un problema de conexión de red. Por ejemplo, si necesita saber cuándo hay un problema de red inmediatamente, puede reducir los valores de maxRetries y retryDelayInMs.

Después de ejecutar la processError función, el cliente sigue recibiendo eventos de la partición siempre y cuando el error fuera un reintento. De lo contrario, el cliente invoca la función proporcionada por processClose el usuario. Esta función también se invoca cuando se detiene la suscripción o cuando el cliente deja de leer eventos de la partición actual debido a que otra instancia de la aplicación la recoge como parte del equilibrio de carga.

La processClose función proporciona una oportunidad para actualizar los puntos de control si es necesario. Después de ejecutar processClose, el cliente (o en el caso del equilibrio de carga, un cliente de otra instancia de la aplicación) invocará la función proporcionada por processInitialize el usuario para reanudar la lectura de eventos desde el último punto de control actualizado para la misma partición.

Si desea dejar de intentar leer eventos, debe llamar a close() en el subscription devuelto por el subscribe método .

Ejemplos

En las secciones siguientes se proporcionan fragmentos de código que abarcan algunas de las tareas comunes mediante Azure Event Hubs

Inspección de un centro de eventos

Muchas operaciones del centro de eventos tienen lugar dentro del ámbito de una partición específica. Dado que las particiones son propiedad del centro de eventos, sus nombres se asignan en el momento en que se crean. Para comprender qué particiones están disponibles, consulte el centro de eventos mediante cualquiera de los dos clientes disponibles: EventHubProducerClient o EventHubConsumerClient

En el ejemplo siguiente, se usa .EventHubProducerClient

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

Publicación de eventos en un centro de eventos

Para publicar eventos, deberá crear un elemento EventHubProducerClient. Aunque en el ejemplo siguiente se muestra una manera de crear el cliente, consulte la sección Autenticar el cliente para obtener otras formas de crear instancias del cliente.

Puede publicar eventos en una partición específica o permitir que el servicio Event Hubs decida en qué eventos de partición se deben publicar. Se recomienda usar el enrutamiento automático cuando la publicación de eventos debe ser de alta disponibilidad o cuando los datos del evento se deben distribuir uniformemente entre las particiones. En el ejemplo siguiente, aprovecharemos el enrutamiento automático.

  • Create un EventDataBatch objeto mediante createBatch
  • Agregue eventos al lote mediante el método tryAdd . Puede hacerlo hasta que se alcance el límite máximo de tamaño de lote o hasta que haya terminado de agregar el número de eventos que quiera, lo que ocurra primero. Este método devolvería false para indicar que no se pueden agregar más eventos al lote debido a que se alcanza el tamaño máximo del lote.
  • Envíe el lote de eventos mediante el método sendBatch .

En el ejemplo siguiente, intentamos enviar 10 eventos a Azure Event Hubs.

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

Hay opciones que puede pasar en diferentes fases para controlar el proceso de envío de eventos a Azure Event Hubs.

  • El EventHubProducerClient constructor toma un parámetro opcional de tipo EventHubClientOptions que puede usar para especificar opciones como el número de reintentos.
  • El createBatch método toma un parámetro opcional de tipo CreateBatchOptions que puede usar para especificar el tamaño máximo de lote admitido por el lote que se va a crear.
  • El sendBatch método toma un parámetro opcional de tipo SendBatchOptions que puede usar para especificar abortSignal para cancelar la operación actual.
  • En caso de que quiera enviar a una partición específica, una sobrecarga del sendBatch método permite pasar el identificador de la partición a la que enviar eventos. En el ejemplo inspeccionar un centro de eventos anterior se muestra cómo capturar los identificadores de particiones disponibles.

Nota: Al trabajar con Azure Stream Analytics, el cuerpo del evento que se envía también debe ser un objeto JSON. Por ejemplo: body: { "message": "Hello World" }

Consumo de eventos de un centro de eventos

Para consumir eventos de una instancia del centro de eventos, también debe saber a qué grupo de consumidores quiere dirigirse. Una vez que lo sepa, está listo para crear un eventHubConsumerClient. Aunque en el ejemplo siguiente se muestra una manera de crear el cliente, consulte la sección Autenticar el cliente para obtener otras formas de crear instancias del cliente.

El subscribe método del cliente tiene sobrecargas que, combinadas con el constructor, pueden satisfacer varias maneras de consumir eventos:

El subscribe método toma un parámetro opcional de tipo SubscriptionOptions que se puede usar para especificar opciones como maxBatchSize (número de eventos a esperar) y maxWaitTimeInSeconds (cantidad de tiempo para esperar a que lleguen los eventos maxBatchSize).

Consumo de eventos en un único proceso

Empiece por crear una instancia de EventHubConsumerClienty, a continuación, llame al subscribe() método en él para empezar a consumir eventos.

El subscribe método toma devoluciones de llamada para procesar eventos a medida que se reciben de Azure Event Hubs. Para dejar de recibir eventos, puede llamar close() al objeto devuelto por el subscribe() método .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Consumo de eventos con equilibrio de carga en varios procesos

Azure Event Hubs es capaz de tratar millones de eventos por segundo. Para escalar la aplicación de procesamiento, puede ejecutar varias instancias de la aplicación y equilibrar la carga entre sí.

Empiece por crear una instancia de EventHubConsumerClient mediante una de las sobrecargas del constructor que toman un CheckpointStorey, a continuación, llame al subscribe() método para empezar a consumir eventos. El almacén de puntos de control permitirá a los suscriptores de un grupo de consumidores coordinar el procesamiento entre varias instancias de la aplicación.

En este ejemplo, usaremos desde BlobCheckpointStore el @azure/eventhubs-checkpointstore-blob paquete que implementa las lecturas y escrituras necesarias en un almacén duradero mediante Azure Blob Storage.

El subscribe método toma devoluciones de llamada para procesar eventos a medida que se reciben de Azure Event Hubs. Para dejar de recibir eventos, puede llamar close() al objeto devuelto por el subscribe() método .

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Consulte Equilibrio de la carga de particiones entre varias instancias de la aplicación para más información.

Consumo de eventos de una sola partición

Empiece por crear una instancia de EventHubConsumerClienty, a continuación, llame al subscribe() método en él para empezar a consumir eventos. Pase el identificador de la partición a la que desea dirigirse al subscribe() método para consumir solo desde esa partición.

En el ejemplo siguiente, se usa la primera partición.

El subscribe método toma devoluciones de llamada para procesar eventos a medida que se reciben de Azure Event Hubs. Para dejar de recibir eventos, puede llamar close() al objeto devuelto por el subscribe() método .

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

Uso de EventHubConsumerClient para trabajar con IotHub

También puede usar EventHubConsumerClient para trabajar con IotHub. Esto es útil para recibir datos de telemetría de IotHub desde el centro de eventos vinculado. El cadena de conexión asociado no tendrá notificaciones de envío, por lo que no es posible enviar eventos.

  • Tenga en cuenta que el cadena de conexión debe ser para un punto de conexión compatible con el centro de eventos (por ejemplo, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

Solución de problemas

Dependencias de AMQP

La biblioteca de Event Hubs depende de la biblioteca rhea-promise para administrar conexiones, enviar y recibir eventos a través del protocolo AMQP .

Registro

Puede establecer la variable de entorno para habilitar el AZURE_LOG_LEVEL registro en stderr:

export AZURE_LOG_LEVEL=verbose

Para obtener instrucciones más detalladas sobre cómo habilitar los registros, consulte los documentos del paquete @azure/logger.

También puede establecer la DEBUG variable de entorno para obtener registros al usar esta biblioteca. Esto puede ser útil si también desea emitir registros de las dependencias rhea-promise y rhea también.

Nota: AZURE_LOG_LEVEL, si se establece, tiene prioridad sobre DEBUG. No especifique ninguna azure biblioteca a través de DEBUG cuando también especifique AZURE_LOG_LEVEL o llame a setLogLevel.

  • Obtener solo los registros de depuración de nivel de información del SDK de Event Hubs.
export DEBUG=azure:*:info
  • Obtención de registros de depuración desde el SDK de Event Hubs y la biblioteca de nivel de protocolo.
export DEBUG=azure*,rhea*
  • Si no está interesado en ver los datos de eventos sin procesar (que consume una gran cantidad de espacio en disco o consola), puede establecer la DEBUG variable de entorno de la siguiente manera:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • Si solo está interesado en errores y advertencias del SDK, puede establecer la DEBUG variable de entorno de la siguiente manera:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Pasos siguientes

Más código de ejemplo

Eche un vistazo al directorio de ejemplos para obtener ejemplos detallados de cómo usar esta biblioteca para enviar y recibir eventos hacia y desde Event Hubs.

Contribuciones

Si desea contribuir a esta biblioteca, lea la guía de contribución para obtener más información sobre cómo compilar y probar el código.

Impresiones