Compartir a través de


Azure Event Hubs biblioteca de almacén de puntos de control para Javascript mediante blobs de almacenamiento

Una solución basada en Azure Blob Storage para almacenar puntos de control y ayudar en el equilibrio de carga al usar EventHubConsumerClient desde la biblioteca @azure/event-hubs

Código | fuentePaquete (npm) | Documentación | de referencia de APIMuestras

Introducción

Instalar el paquete

Instalación de la biblioteca de blobs del almacén de puntos de control de Azure Event Hubs mediante npm

npm install @azure/eventhubs-checkpointstore-blob

Requisitos previos: debe tener una suscripción de Azure, un espacio de nombres de Event Hubs para usar este paquete y una cuenta de almacenamiento.

Si usa este paquete en una aplicación de Node.js, use Node.js 8.x o superior.

Configurar Typescript

Los usuarios de TypeScript deben tener instaladas definiciones de tipo de nodo:

npm install @types/node

También debe habilitar compilerOptions.allowSyntheticDefaultImports en tsconfig.json. Tenga en cuenta que si ha habilitado , allowSyntheticDefaultImports está habilitado compilerOptions.esModuleInteropde forma predeterminada. Consulte el manual de opciones del compilador de TypeScript para obtener más información.

Conceptos clave

  • Escalado: cree varios consumidores y que cada consumidor tome posesión de la lectura desde varias particiones de Event Hubs.

  • Equilibrio de carga: Las aplicaciones que admiten el equilibrio de carga constan de una o varias instancias de EventHubConsumerClient las cuales se han configurado para consumir eventos del mismo centro de eventos y del mismo grupo de consumidores y del mismo CheckpointStore. Equilibran la carga de trabajo entre distintas instancias mediante la distribución de las particiones que se van a procesar entre sí.

  • Puntos de control: Es un proceso por el que los lectores marcan o confirman su posición dentro de una secuencia de eventos de partición. La creación de puntos de comprobación es responsabilidad del consumidor y se realiza por partición dentro de un grupo de consumidores. Esta responsaibilidad significa que por cada grupo de consumidores, cada lector de la partición debe realizar un seguimiento de su posición actual en el flujo del evento y puede informar al servicio cuando considere que el flujo de datos se ha completado.

    Si se desconecta un lector de una partición, cuando se vuelve a conectar comienza a leer en el punto de comprobación que envió previamente el último lector de esa partición en ese grupo de consumidores. Cuando se conecta el lector, pasa este desplazamiento al centro de eventos para especificar la ubicación en la que se va a empezar a leer. De este modo, puede usar puntos de comprobación para marcar eventos como "completados" por las aplicaciones de bajada y para ofrecer resistencia en caso de que se produzca una conmutación por error entre lectores que se ejecutan en máquinas distintas. Es posible volver a los datos más antiguos especificando un desplazamiento inferior desde este proceso de puntos de comprobación. Mediante este mecanismo, los puntos de comprobación permiten una resistencia a la conmutación por error y una reproducción del flujo de eventos.

    BlobCheckpointStore es una clase que implementa métodos clave requeridos por EventHubConsumerClient para equilibrar los puntos de control de carga y actualización.

Ejemplos

Creación de un CheckpointStore objeto mediante Azure Blob Storage

Use el fragmento de código siguiente para crear un CheckpointStore. Deberá proporcionar el cadena de conexión a la cuenta de almacenamiento.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Eventos de punto de comprobación mediante Azure Blob Storage

Para controlar los eventos recibidos mediante Azure Blob Storage, deberá pasar un objeto compatible con la interfaz SubscriptionEventHandlers junto con el código para llamar al updateCheckpoint() método .

En este ejemplo, SubscriptionHandlers implementa SubscriptionEventHandlers y también controla los puntos de comprobación.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Solución de problemas

Habilitamiento de registros

Puede establecer la AZURE_LOG_LEVEL variable de entorno en uno de los valores siguientes para habilitar el registro en stderr:

  • verbose
  • info
  • warning
  • error

También puede establecer el nivel de registro mediante programación importando el paquete @azure/registrador y llamando a la setLogLevel función con uno de los valores de nivel de registro.

Al establecer un nivel de registro mediante programación o a través de la AZURE_LOG_LEVEL variable de entorno, se emitirán todos los registros escritos mediante un nivel de registro igual o menor que el que elija. Por ejemplo, al establecer el nivel infode registro en , los registros que se escriben para los niveles warning y error también se emiten. Este SDK sigue las directrices del SDK de Azure para TypeScript al determinar en qué nivel se debe iniciar sesión.

También puede establecer la DEBUG variable de entorno para obtener registros al usar esta biblioteca. Esto puede ser útil si también desea emitir registros de las dependencias rhea-promise y rhea también.

Nota: AZURE_LOG_LEVEL, si se establece, tiene prioridad sobre DEBUG. No especifique ninguna azure biblioteca a través de DEBUG cuando también especifique AZURE_LOG_LEVEL o llame a setLogLevel.

Puede establecer la siguiente variable de entorno para obtener los registros de depuración cuando se usa esta biblioteca.

  • Obtener solo los registros de depuración de nivel de información del blob De almacén de puntos de control de Eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Registro en un archivo

  • Habilite el registro como se muestra anteriormente y, a continuación, ejecute el script de prueba como se indica a continuación:

    • Las instrucciones de registro del script de prueba van a out.log y las instrucciones de registro del sdk van a debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Las instrucciones de registro del script de prueba y el SDK van al mismo archivo out.log redirigiendo stderr a stdout (&1) y, a continuación, redirigen stdout a un archivo:

      node your-test-script.js >out.log 2>&1
      
    • Las instrucciones de registro del script de prueba y el SDK van al mismo archivo out.log.

      node your-test-script.js &> out.log
      

Pasos siguientes

Eche un vistazo al directorio de ejemplos para obtener un ejemplo detallado.

Contribuciones

Si desea contribuir a esta biblioteca, lea la guía de contribución para obtener más información sobre cómo compilar y probar el código.

Impresiones