Eseguire operazioni comuni con la libreria client di Hub eventi
Questa unità contiene esempi di operazioni comuni che è possibile eseguire con la libreria client di Hub eventi (Azure.Messaging.EventHubs
) per interagire con un hub eventi.
Esaminare Hub eventi
Molte operazioni di Hub eventi vengono eseguite nell'ambito di una partizione specifica. Poiché Hub eventi è proprietario delle partizioni, i relativi nomi vengono assegnati al momento della creazione. Per conoscere le partizioni disponibili, eseguire query su Hhub eventi usando uno dei client di Hub eventi. A scopo illustrativo, in questi esempi viene dimostrato EventHubProducerClient
, ma il concetto e la forma sono comuni nei client.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
string[] partitionIds = await producer.GetPartitionIdsAsync();
}
Pubblicare eventi in Hub eventi
Per pubblicare eventi, è necessario creare un oggetto EventHubProducerClient
. I producer pubblicano gli eventi in batch e potrebbero richiedere una partizione specifica o consentire al servizio Hub eventi di decidere in quali eventi di partizione pubblicare. È consigliabile usare il routing automatico quando la pubblicazione degli eventi deve essere a disponibilità elevata o quando i dati degli eventi devono essere distribuiti in modo uniforme tra le partizioni. L'esempio usa il routing automatico.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync();
eventBatch.TryAdd(new EventData(new BinaryData("First")));
eventBatch.TryAdd(new EventData(new BinaryData("Second")));
await producer.SendAsync(eventBatch);
}
Leggere gli eventi da un Hub eventi
Per leggere gli eventi da un Hub eventi, è necessario creare un oggetto EventHubConsumerClient
per un determinato gruppo di consumer. Quando viene creato un Hub eventi, include un gruppo di consumer predefinito che può essere usato per iniziare a esplorare Hub eventi. In questo esempio, ci si concentra sulla lettura di tutti gli eventi pubblicati in Hub eventi usando un iteratore.
Nota
È importante notare che questo approccio all'utilizzo è destinato a migliorare l'esperienza di esplorazione della libreria client e della creazione di prototipi dell'hub eventi. È consigliabile non usarlo negli scenari di produzione. Per l'uso in produzione, è consigliabile usare il client processore di eventi, che offre un'esperienza più affidabile e con migliori prestazioni.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the Event Hub. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
Leggere gli eventi da una partizione di Hub eventi
Per leggere da una partizione specifica, l'utente deve specificare nel flusso di eventi dove iniziare a ricevere gli eventi. In questo esempio, ci si concentra sulla lettura di tutti gli eventi pubblicati per la prima partizione di Hub eventi.
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
EventPosition startingPosition = EventPosition.Earliest;
string partitionId = (await consumer.GetPartitionIdsAsync()).First();
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
{
// At this point, the loop will wait for events to be available in the partition. When an event
// is available, the loop will iterate with the event that was received. Because we did not
// specify a maximum wait time, the loop will wait forever unless cancellation is requested using
// the cancellation token.
}
}
Elaborare eventi usando un client processore di eventi
Per la maggior parte degli scenari di produzione, è consigliabile usare EventProcessorClient
per la lettura e l'elaborazione di eventi. Poiché l'oggetto EventProcessorClient
ha una dipendenza dai BLOB di Archiviazione di Azure per la persistenza dello stato, sarà necessario specificare un oggetto BlobContainerClient
per il processore, configurato per l'account di archiviazione e il contenitore da usare.
var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;
var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
// The processor performs its work in the background; block until cancellation
// to allow processing to take place.
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
// This is expected when the delay is canceled.
}
try
{
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks, the handlers should be removed when processing is complete.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}