Executar operações comuns com a biblioteca de clientes dos Hubs de Eventos

Concluído

Esta unidade contém exemplos de operações comuns que você pode executar com a biblioteca de clientes dos Hubs de Eventos (Azure.Messaging.EventHubs) para interagir com os Hubs de Eventos.

Inspecionar Hubs de Eventos

Muitas operações dos Hubs de Eventos ocorrem dentro do escopo de uma partição específica. Como as partições pertencem aos Hubs de Eventos, seus nomes são atribuídos no momento da criação. Para entender quais partições estão disponíveis, confira os Hubs de Eventos usando um dos clientes deles. Para ilustração, o EventHubProducerClient é demonstrado nesses exemplos, mas o conceito e a forma são comuns entre os clientes.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Publicar eventos nos Hubs de Eventos

Para publicar eventos, você precisa criar um EventHubProducerClient. Os produtores publicam eventos em lotes e podem solicitar uma partição específica ou permitir que o serviço de Hubs de Eventos decida em qual partição os eventos devem ser publicados. É recomendável usar o roteamento automático quando a publicação de eventos precisar ser altamente disponível ou quando os dados de evento tiverem de ser distribuídos uniformemente entre as partições. Nosso exemplo aproveita o roteamento automático.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Ler eventos de um Hub de Eventos

Para ler eventos de um Hub de Eventos, você precisa criar um EventHubConsumerClient para um determinado grupo de consumidores. Quando um Hub de Eventos é criado, ele fornece um grupo de consumidores padrão que pode ser usado para começar a explorar os Hubs de Eventos. Em nosso exemplo, nos concentraremos em ler todos os eventos que foram publicados nos Hubs de Eventos usando um iterador.

Observação

É importante observar que essa abordagem de consumo destina-se a melhorar a experiência de explorar a biblioteca de clientes dos Hubs de Eventos e a criação de protótipos. Recomendamos que ele não seja usado em cenários de produção. Para uso em produção, recomendamos usar o Cliente do Processador de Eventos, pois ele fornece uma experiência mais robusta e de desempenho.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Ler eventos por meio de um Hub de Eventos

Para ler de uma partição específica, o consumidor precisa especificar onde começa a receber eventos no fluxo de eventos; em nosso exemplo, nos concentramos em ler todos os eventos publicados na primeira partição dos Hubs de Eventos.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Processar eventos usando um cliente do Processador de Eventos

Para a maioria dos cenários de produção, recomendamos que EventProcessorClient seja usado para ler e processar eventos. Como EventProcessorClient tem uma dependência nos Blobs de Armazenamento do Azure para persistência de estado, você precisa fornecer um BlobContainerClient para o processador, que foi configurado para a conta de armazenamento e o contêiner que devem ser usados.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}