Realizar operaciones comunes con la biblioteca cliente de Event Hubs

Completado

Esta unidad contiene ejemplos de operaciones comunes que puede realizar con la biblioteca cliente de Event Hubs (Azure.Messaging.EventHubs) para interactuar con una instancia de Event Hubs.

Inspección de Event Hubs

Muchas operaciones de Event Hubs tienen lugar dentro del ámbito de una partición específica. Dado que las particiones son propiedad de Event Hubs, sus nombres se asignan en el momento en que se crean. Para comprender qué particiones están disponibles, consulte el centro de eventos mediante uno de los clientes de Event Hubs. A modo de ilustración, se muestra EventHubProducerClient en estos ejemplos, pero el concepto y la forma son comunes entre los clientes.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Publicación de eventos en Event Hubs

Para publicar eventos, deberá crear un elemento EventHubProducerClient. Los productores publican eventos por lotes y pueden solicitar una partición específica o permitir que el servicio Event Hubs decida en qué eventos de partición se deben publicar. Se recomienda usar el enrutamiento automático cuando la publicación de eventos necesite tener una alta disponibilidad o cuando los datos de eventos se deban distribuir entre las particiones de manera uniforme. Nuestro ejemplo aprovecha el enrutamiento automático.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Lectura de eventos desde una instancia de Event Hubs

Para leer eventos de una instancia de Event Hubs, debe crear un elemento EventHubConsumerClient para un grupo de consumidores determinado. Cuando se crea una instancia de Event Hubs, se proporciona un grupo de consumidores predeterminado que se puede usar para empezar a explorar Event Hubs. En nuestro ejemplo, nos centraremos en leer todos los eventos publicados en Event Hubs mediante un iterador.

Nota:

Es importante tener en cuenta que este enfoque de consumo está pensado para mejorar la experiencia de exploración de la biblioteca cliente de Event Hubs y la creación de prototipos. Se recomienda no utilizarlo en escenarios de producción. Para su uso en producción, se recomienda usar el cliente de procesador de eventos, ya que proporciona una experiencia más sólida y mejorada.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Lectura de eventos desde una partición de Event Hubs

Para leer de una partición específica, el consumidor necesita especificar en qué parte del flujo de eventos quiere empezar a recibirlos; en nuestro ejemplo, nos centramos en leer todos los eventos publicados para la primera partición de Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Procesamiento de eventos mediante un cliente de procesador de eventos

En la mayoría de los escenarios de producción, se recomienda usar EventProcessorClient para leer y procesar eventos. Dado que EventProcessorClient tiene una dependencia de los blobs de Azure Storage para la persistencia de su estado, debe proporcionar un elemento BlobContainerClient para el procesador, que se ha configurado para la cuenta de almacenamiento y el contenedor que se deben usar.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}