Azure Event Hubs biblioteca cliente del procesador de eventos para .NET: versión 5.9.3

Azure Event Hubs es un servicio de publicación y suscripción altamente escalable que puede ingerir millones de eventos por segundo y transmitirlos a varios consumidores. Esto le permite procesar y analizar grandes cantidades de datos generados por los dispositivos y aplicaciones conectados. Una vez que Event Hubs ha recopilado los datos, puede recuperarlos, transformarlos y almacenarlos mediante cualquier proveedor de análisis en tiempo real o con adaptadores de almacenamiento o procesamiento por lotes. Si desea obtener más información sobre Azure Event Hubs, puede que desee revisar: ¿Qué es Event Hubs?

La biblioteca cliente del procesador de eventos es un complemento de la biblioteca cliente de Azure Event Hubs, lo que proporciona un cliente independiente para consumir eventos de una manera sólida, duradera y escalable que es adecuada para la mayoría de los escenarios de producción. Una implementación con opiniones creada mediante blobs de Azure Storage, se recomienda el procesador de eventos para:

  • Leer y procesar eventos en todas las particiones de un centro de eventos a escala con resistencia a errores transitorios y problemas de red intermitentes.

  • Procesar eventos de forma cooperativa, donde varios procesadores distribuyen y comparten dinámicamente la responsabilidad en el contexto de un grupo de consumidores, administrando correctamente la carga a medida que se agregan y quitan del grupo.

  • Administración de puntos de control y estado para el procesamiento de forma duradera mediante blobs de Azure Storage como almacén de datos subyacente.

Código | fuentePaquete (NuGet) | Documentación | de referencia de APIDocumentación | del productoGuía de solución de problemas

Introducción

Requisitos previos

  • Suscripción de Azure: Para usar los servicios de Azure, incluida Azure Event Hubs, necesitará una suscripción. Si no tiene una cuenta de Azure existente, puede registrarse para obtener una evaluación gratuita o usar las ventajas de la suscripción de Visual Studio al crear una cuenta.

  • Espacio de nombres de Event Hubs con un centro de eventos: Para interactuar con Azure Event Hubs, también deberá tener un espacio de nombres y un centro de eventos disponibles. Si no está familiarizado con la creación de recursos de Azure, puede seguir la guía paso a paso para crear un centro de eventos mediante el Azure Portal. Allí también puede encontrar instrucciones detalladas para usar la CLI de Azure, Azure PowerShell o plantillas de Azure Resource Manager (ARM) para crear un centro de eventos.

  • Cuenta de Azure Storage con Blob Storage: Para conservar los puntos de control y controlar la propiedad en Azure Storage, deberá tener una cuenta de Azure Storage con blobs disponibles. La cuenta de Azure Storage que se usa para el procesador debe tener deshabilitado la eliminación temporal y el control de versiones de blobs. Si no está familiarizado con las cuentas de Azure Storage, puede seguir la guía paso a paso para crear una cuenta de almacenamiento mediante el Azure Portal. Allí también puede encontrar instrucciones detalladas para usar la CLI de Azure, Azure PowerShell o plantillas de Azure Resource Manager (ARM) para crear cuentas de almacenamiento.

  • Contenedor de blobs de Azure Storage: Los datos de punto de control y propiedad de Azure Storage se escribirán en blobs de un contenedor específico. EventProcessorClient requiere un contenedor existente y no creará implícitamente uno para ayudar a protegerse contra la configuración accidental incorrecta. Se recomienda usar un contenedor único para cada combinación de centro de eventos y grupo de consumidores. Si no está familiarizado con los contenedores de Azure Storage, puede consultar la documentación sobre la administración de contenedores. Allí puede encontrar instrucciones detalladas para usar .NET, la CLI de Azure o Azure PowerShell para crear un contenedor.

  • C# 8.0: La biblioteca cliente de Azure Event Hubs usa nuevas características que se introdujeron en C# 8.0. Para aprovechar la sintaxis de C# 8.0, se recomienda compilar con el SDK de .NET Core 3.0 o posterior con una versión de lenguaje de .latest

    Los usuarios de Visual Studio que deseen aprovechar al máximo la sintaxis de C# 8.0 tendrán que usar Visual Studio 2019 o posterior. Visual Studio 2019, incluida la edición gratuita Community, se puede descargar aquí. Los usuarios de Visual Studio 2017 pueden aprovechar la sintaxis de C# 8 haciendo uso del paquete NuGet Microsoft.Net.Compilers y estableciendo la versión del lenguaje, aunque es posible que la experiencia de edición no sea ideal.

    Todavía puede usar la biblioteca con versiones anteriores del lenguaje C#, pero tendrá que administrar miembros asincrónicos enumerables y asincrónicos descartables manualmente en lugar de beneficiarse de la nueva sintaxis. Puede seguir teniendo como destino cualquier versión de marco compatible con el SDK de .NET Core, incluidas las versiones anteriores de .NET Core o .NET Framework. Para obtener más información, vea: cómo especificar marcos de destino.

    Nota importante: Para compilar o ejecutar los ejemplos y los ejemplos sin modificaciones, es necesario usar C# 11.0. Todavía puede ejecutar los ejemplos si decide ajustarlos para otras versiones de lenguaje.

Para crear rápidamente los recursos necesarios en Azure y recibir cadenas de conexión para ellos, puede implementar nuestra plantilla de ejemplo haciendo clic en:

Implementación en Azure

Instalar el paquete

Instale la biblioteca cliente del procesador de eventos de Azure Event Hubs para .NET mediante NuGet:

dotnet add package Azure.Messaging.EventHubs.Processor

Autenticar el cliente

Obtención de una cadena de conexión de Event Hubs

Para que la biblioteca cliente de Event Hubs interactúe con un centro de eventos, deberá comprender cómo conectarse y autorizarla. Los medios más fáciles para hacerlo es usar una cadena de conexión, que se crea automáticamente al crear un espacio de nombres de Event Hubs. Si no está familiarizado con el uso de cadenas de conexión con Event Hubs, puede seguir la guía paso a paso para obtener una cadena de conexión de Event Hubs.

Obtención de una cadena de conexión de Azure Storage

Para que el cliente del procesador de eventos use blobs de Azure Storage para la creación de puntos de comprobación, deberá comprender cómo conectarse a una cuenta de almacenamiento y autorizarla. El método más sencillo de hacerlo es usar una cadena de conexión, que se genera en el momento en que se crea la cuenta de almacenamiento. Si no está familiarizado con la autorización de cadena de conexión de la cuenta de almacenamiento en Azure, puede seguir la guía paso a paso para configurar las cadenas de conexión de Azure Storage.

Una vez que tenga las cadenas de conexión, consulte Creación de un cliente de procesador de eventos para obtener un ejemplo de cómo usarlos para crear el procesador.

Conceptos clave

  • Un procesador de eventos es una construcción destinada a administrar las responsabilidades asociadas a la conexión a un centro de eventos determinado y a los eventos de procesamiento de cada una de sus particiones, en el contexto de un grupo de consumidores específico. El acto de procesamiento de eventos leídos desde la partición y el tratamiento de los errores que se producen se delega el procesador de eventos en el código que proporciona, lo que permite que la lógica se centre en entregar valor empresarial mientras el procesador controla las tareas asociadas a eventos de lectura, administra las particiones y permite que el estado se conserve en forma de puntos de control.

  • La creación de puntos de comprobación es un proceso por el que los lectores marcan y conservan su posición para los eventos que se han procesado para una partición. Los puntos de comprobación son responsabilidad del consumidor y se producen en una partición, normalmente en el contexto de un grupo de consumidores específico. EventProcessorClientPara , esto significa que, para un grupo de consumidores y una combinación de particiones, el procesador debe realizar un seguimiento de su posición actual en el flujo de eventos. Si desea obtener más información, consulte puntos de comprobación en la documentación del producto de Event Hubs.

    Cuando se conecta un procesador de eventos, comenzará a leer eventos en el punto de control que el último procesador de esa partición de ese grupo de consumidores conservaba anteriormente, si existe uno. A medida que un procesador de eventos lee y actúa sobre eventos de la partición, debe crear periódicamente puntos de control para marcar los eventos como "completos" por las aplicaciones de nivel inferior y proporcionar resistencia si se produce un error en el procesador de eventos o el entorno que lo hospeda. Si es necesario, es posible volver a procesar eventos marcados previamente como "completos" especificando un desplazamiento anterior a través de este proceso de control.

  • Una partición es una secuencia ordenada de eventos que se mantiene en un centro de eventos. Las particiones son un medio de organización de datos asociado al paralelismo requerido por los consumidores de eventos. Azure Event Hubs proporciona streaming de mensajes a través de un patrón de consumidor con particiones en el que cada consumidor solo lee un subconjunto específico, o partición, de la secuencia de mensajes. A medida que llegan eventos más recientes, se agregan al final de esta secuencia. El número de particiones se especifica en el momento en que se crea un centro de eventos y no se puede modificar.

  • Un grupo de consumidores es una vista de un centro de eventos completo. Los grupos de consumidores habilitan a varias aplicaciones de consumo para que cada una de ellas tenga una vista independiente de la secuencia de eventos, y para que lea la secuencia de manera independiente a su propio ritmo y desde su propia ubicación. Puede haber como máximo cinco lectores simultáneos en una partición por grupo de consumidores; sin embargo, se recomienda que solo haya un consumidor activo para un emparejamiento determinado de partición y grupo de consumidores. Cada lector activo recibe todos los eventos de su partición; si hay varios lectores en la misma partición, recibirán eventos duplicados.

Para obtener más conceptos y una explicación más detallada, consulte: Características de Event Hubs.

Duración del cliente

EventProcessorClient es seguro almacenar en caché y usar como singleton durante la vigencia de la aplicación, que es el procedimiento recomendado cuando los eventos se leen periódicamente. Los clientes son responsables de la administración eficaz de la red, la CPU y el uso de memoria, trabajando para mantener el uso bajo durante períodos de inactividad. Se requiere llamar StopProcessingAsync al procesador o StopProcessing en el procesador para asegurarse de que los recursos de red y otros objetos no administrados se limpien correctamente.

Seguridad para subprocesos

Garantizamos que todos los métodos de instancia de cliente sean seguros para subprocesos e independientes entre sí (guía). Esto garantiza que la recomendación de reutilizar instancias de cliente siempre es segura, incluso entre subprocesos.

Los tipos de modelo de datos, como EventData y EventDataBatch no son seguros para subprocesos. No deben compartirse entre subprocesos ni usarse simultáneamente con métodos de cliente.

Conceptos adicionales

Opciones | de clienteControladores de eventos | Control de errores | Diagnóstico | Simulación (procesador) | Simulación (tipos de cliente)

Ejemplos

Creación de un cliente de procesador de eventos

Dado que EventProcessorClient tiene una dependencia de los blobs de Azure Storage para la persistencia de su estado, deberá proporcionar un BlobContainerClient para el procesador, que se ha configurado para la cuenta de almacenamiento y el contenedor que se deben usar. El contenedor usado para configurar debe EventProcessorClient existir.

EventProcessorClient Dado que no tiene ninguna manera de conocer la intención de especificar un contenedor que no existe, no creará implícitamente el contenedor. Esto actúa como protección contra un contenedor mal configurado, lo que provoca que un procesador no autorizado no pueda compartir la propiedad e interferir con otros procesadores del grupo de consumidores.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

Configurar los controladores de eventos y errores

Para usar , EventProcessorClientse deben proporcionar controladores para el procesamiento de eventos y los errores. Estos controladores se consideran independientes y los desarrolladores son responsables de garantizar que se tengan en cuenta las excepciones dentro del código del controlador.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

Iniciar y detener el procesamiento

EventProcessorClient realizará su procesamiento en segundo plano una vez que se haya iniciado explícitamente y continúe haciéndolo hasta que se haya detenido explícitamente. Aunque esto permite que el código de la aplicación realice otras tareas, también asume la responsabilidad de asegurarse de que el proceso no finaliza durante el procesamiento si no se realizan otras tareas.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

Uso de una entidad de seguridad de Active Directory con el cliente del procesador de eventos

La biblioteca de identidades de Azure proporciona compatibilidad con la autenticación de Azure Active Directory que se puede usar para las bibliotecas cliente de Azure, incluidos Event Hubs y Azure Storage.

Para usar una entidad de seguridad de Active Directory, se especifica una de las credenciales disponibles de la Azure.Identity biblioteca al crear el cliente de Event Hubs. Además, el espacio de nombres completo de Event Hubs y el nombre del centro de eventos deseado se proporcionan en lugar de la cadena de conexión de Event Hubs.

Para usar una entidad de seguridad de Active Directory con contenedores de blobs de Azure Storage, se debe proporcionar la dirección URL completa al contenedor al crear el cliente de almacenamiento. Puede encontrar detalles sobre los formatos de URI válidos para acceder a Blob Storage en Nomenclatura y referencia a contenedores, blobs y metadatos.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

Al usar Azure Active Directory con Event Hubs, se debe asignar a la entidad de seguridad un rol que permita leer desde Event Hubs, como el Azure Event Hubs Data Receiver rol. Para más información sobre el uso de la autorización de Azure Active Directory con Event Hubs, consulte la documentación asociada.

Al usar Azure Active Directory con Azure Storage, la entidad de seguridad debe tener asignado un rol que permita el acceso de lectura, escritura y eliminación a blobs, como el Storage Blob Data Contributor rol. Para más información sobre el uso de la autorización de Active Directory con Azure Storage, consulte la documentación asociada y el ejemplo de autorización de Azure Storage.

Solución de problemas

Para obtener información detallada sobre la solución de problemas, consulte la Guía de solución de problemas de Event Hubs.

Control de excepciones

Excepciones de cliente del procesador de eventos

El cliente del procesador de eventos realiza todos los intentos de ser resistentes frente a excepciones y realizará las acciones necesarias para continuar procesando a menos que sea imposible hacerlo. No se necesita ninguna acción de los desarrolladores para que esto tenga lugar; forma parte de forma nativa del comportamiento del procesador.

Para permitir a los desarrolladores inspeccionar y reaccionar ante las excepciones que se producen en las operaciones del cliente del procesador de eventos, se muestran a través del ProcessError evento . Los argumentos de este evento ofrecen detalles sobre la excepción y el contexto en el que se observó. Los desarrolladores pueden realizar operaciones normales en el cliente del procesador de eventos desde dentro de este controlador de eventos, como detenerlo o reiniciarlo en respuesta a errores, pero no puede influir en el comportamiento de excepción del procesador.

Para obtener un ejemplo básico de implementación del controlador de errores, consulte el ejemplo: Controladores de procesador de eventos.

Excepciones en controladores de eventos

Dado que el cliente del procesador de eventos carece del contexto adecuado para comprender la gravedad de las excepciones dentro de los controladores de eventos que proporcionan los desarrolladores, no puede asumir qué acciones serían una respuesta razonable a ellas. Como resultado, los desarrolladores se consideran responsables de las excepciones que se producen dentro de los controladores de eventos que proporcionan mediante try/catch bloques y otras construcciones de lenguaje estándar.

El cliente del procesador de eventos no intentará detectar excepciones en el código de desarrollador ni exponerlas explícitamente. El comportamiento resultante dependerá del entorno de hospedaje del procesador y del contexto en el que se llamó al controlador de eventos. Dado que esto puede variar entre distintos escenarios, se recomienda encarecidamente a los desarrolladores codificar sus controladores de eventos de forma defensiva y tener en cuenta posibles excepciones.

Registro y diagnósticos

La biblioteca cliente del procesador de eventos está totalmente instrumentada para registrar información en varios niveles de detalle mediante .NET EventSource para emitir información. El registro se realiza para cada operación y sigue el patrón de marcar el punto inicial de la operación, su finalización y las excepciones encontradas. También se registra información adicional que puede ofrecer información en el contexto de la operación asociada.

Los registros de cliente del procesador de eventos están disponibles para cualquiera EventListener mediante la participación en el origen denominado "Azure-Messaging-EventHubs-Processor-EventProcessorClient" o la participación en todos los orígenes que tienen el rasgo "AzureEventSource". Para facilitar la captura de registros de las bibliotecas cliente de Azure, la Azure.Core biblioteca usada por Event Hubs ofrece un AzureEventSourceListener. Puede encontrar más información en Captura de registros de Event Hubs mediante AzureEventSourceListener.

La biblioteca del procesador de eventos también se instrumenta para el seguimiento distribuido mediante Application Insights o OpenTelemetry. Puede encontrar más información en el ejemplo Azure.Core Diagnostics.

Pasos siguientes

Más allá de los escenarios descritos, la biblioteca de procesadores de Azure Event Hubs ofrece compatibilidad con escenarios adicionales para ayudar a aprovechar el conjunto de características completo de EventProcessorClient. Para ayudar a explorar algunos de estos escenarios, la biblioteca cliente del procesador de Event Hubs ofrece un proyecto de ejemplos para servir como ilustración para escenarios comunes. Consulte los ejemplos Léame para obtener más información.

Contribuciones

Este proyecto agradece las contribuciones y sugerencias. La mayoría de las contribuciones requieren que acepte un Contrato de licencia para el colaborador (CLA) que declara que tiene el derecho a concedernos y nos concede los derechos para usar su contribución. Para más detalles, visite https://cla.microsoft.com.

Cuando se envía una solicitud de incorporación de cambios, un bot de CLA determinará de forma automática si tiene que aportar un CLA y completar la PR adecuadamente (por ejemplo, la etiqueta, el comentario). Solo siga las instrucciones que le dará el bot. Solo será necesario que lo haga una vez en todos los repositorios con nuestro CLA.

Este proyecto ha adoptado el Código de conducta de Microsoft Open Source. Para más información, consulte las preguntas más frecuentes del código de conducta o póngase en contacto con opencode@microsoft.com si tiene cualquier otra pregunta o comentario.

Consulte nuestra guía de contribución para obtener más información.

Impresiones