Executar operações comuns com a biblioteca de cliente dos Hubs de Eventos

Concluído

Esta unidade contém exemplos de operações comuns que você pode executar com a biblioteca de cliente de Hubs de Eventos (Azure.Messaging.EventHubs) para interagir com Hubs de Eventos.

Inspecionar Hubs de Eventos

Muitas operações de Hubs de Eventos ocorrem dentro do escopo de uma partição específica. Como os Hubs de Eventos são proprietários das partições, seus nomes são atribuídos no momento da criação. Para entender quais partições estão disponíveis, consulte os Hubs de Eventos usando um dos clientes de Hubs de Eventos. Para ilustrar, o EventHubProducerClient é demonstrado nestes exemplos, mas o conceito e a forma são comuns entre os clientes.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Publicar eventos em Hubs de Eventos

Para publicar eventos, você precisa criar um EventHubProducerClientarquivo . Os produtores publicam eventos em lotes e podem solicitar uma partição específica ou permitir que o serviço Hubs de Eventos decida em quais eventos de partição devem ser publicados. Recomendamos o uso do roteamento automático quando a publicação de eventos precisa estar altamente disponível ou quando os dados de eventos devem ser distribuídos uniformemente entre as partições. O nosso exemplo tira partido do encaminhamento automático.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Ler eventos de Hubs de Eventos

Para ler eventos de um Hubs de Eventos, você precisa criar um EventHubConsumerClient para um determinado grupo de consumidores. Quando um Hubs de Eventos é criado, ele fornece um grupo de consumidores padrão que pode ser usado para começar a explorar Hubs de Eventos. Em nosso exemplo, nos concentramos na leitura de todos os eventos publicados nos Hubs de Eventos usando um iterador.

Nota

É importante notar que essa abordagem de consumo destina-se a melhorar a experiência de exploração da biblioteca cliente e prototipagem dos Hubs de Eventos. Recomenda-se que não seja utilizado em cenários de produção. Para uso em produção, recomendamos o uso do Event Processor Client, pois ele fornece uma experiência mais robusta e de alto desempenho.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Ler eventos de uma partição de Hubs de Eventos

Para ler a partir de uma partição específica, o consumidor precisa especificar onde no fluxo de eventos para começar a receber eventos. Em nosso exemplo, nos concentramos na leitura de todos os eventos publicados para a primeira partição dos Hubs de Eventos.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Processar eventos usando um cliente do Processador de Eventos

Para a maioria dos cenários de produção, a recomendação é usar EventProcessorClient para ler e processar eventos. Como o EventProcessorClient tem uma dependência de blobs de Armazenamento do Azure para persistência de seu estado, você precisa fornecer um BlobContainerClient para o processador, que foi configurado para a conta de armazenamento e o contêiner que devem ser usados.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}