Algemene bewerkingen uitvoeren met de Event Hubs-clientbibliotheek

Voltooid

Deze les bevat voorbeelden van veelvoorkomende bewerkingen die u kunt uitvoeren met de Event Hubs-clientbibliotheek (Azure.Messaging.EventHubs) om te communiceren met een Event Hubs.

Event Hubs inspecteren

Veel Event Hubs-bewerkingen vinden plaats binnen het bereik van een specifieke partitie. Omdat Event Hubs eigenaar is van de partities, worden hun namen toegewezen op het moment van maken. Als u wilt weten welke partities beschikbaar zijn, voert u een query uit op de Event Hubs met behulp van een van de Event Hubs-clients. Ter illustratie wordt het EventHubProducerClient in deze voorbeelden gedemonstreerd, maar het concept en de vorm zijn gebruikelijk voor clients.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Gebeurtenissen publiceren naar Event Hubs

Als u gebeurtenissen wilt publiceren, moet u een EventHubProducerClient. Producenten publiceren gebeurtenissen in batches en kunnen een specifieke partitie aanvragen of de Event Hubs-service toestaan om te bepalen naar welke partitiegebeurtenissen moeten worden gepubliceerd. We raden u aan automatische routering te gebruiken wanneer het publiceren van gebeurtenissen maximaal beschikbaar moet zijn of wanneer gebeurtenisgegevens gelijkmatig tussen de partities moeten worden gedistribueerd. In ons voorbeeld wordt gebruikgemaakt van automatische routering.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Gebeurtenissen van een Event Hubs lezen

Als u gebeurtenissen van een Event Hubs wilt lezen, moet u een EventHubConsumerClient voor een bepaalde consumentengroep maken. Wanneer een Event Hubs wordt gemaakt, biedt deze een standaardconsumentengroep die kan worden gebruikt om aan de slag te gaan met het verkennen van Event Hubs. In ons voorbeeld richten we ons op het lezen van alle gebeurtenissen die zijn gepubliceerd naar de Event Hubs met behulp van een iterator.

Notitie

Het is belangrijk te weten dat deze benadering van verbruik is bedoeld om de ervaring van het verkennen van de Event Hubs-clientbibliotheek en het maken van prototypen te verbeteren. Het wordt aanbevolen om deze niet te gebruiken in productiescenario's. Voor productiegebruik raden we u aan de Event Processor-client te gebruiken, omdat deze een krachtigere en performantere ervaring biedt.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Gebeurtenissen lezen uit een Event Hubs-partitie

Voor het lezen van een specifieke partitie moet de consument opgeven waar in de gebeurtenisstroom gebeurtenissen moeten worden ontvangen. In ons voorbeeld richten we ons op het lezen van alle gepubliceerde gebeurtenissen voor de eerste partitie van de Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition. When an event
        // is available, the loop will iterate with the event that was received. Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Gebeurtenissen verwerken met behulp van een Event Processor-client

Voor de meeste productiescenario's is het raadzaam EventProcessorClient gebeurtenissen te lezen en te verwerken. Omdat de EventProcessorClient blobs afhankelijk zijn van Azure Storage-blobs voor persistentie van de status, moet u een BlobContainerClient processor opgeven die is geconfigureerd voor het opslagaccount en de container die moeten worden gebruikt.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}