Event Hubs istemci kitaplığıyla ortak işlemler gerçekleştirme

Tamamlandı

Bu ünitede Event Hubs istemci kitaplığıAzure.Messaging.EventHubs () ile bir Event Hubs ile etkileşime geçmek için gerçekleştirebileceğiniz yaygın işlemlerin örnekleri yer alır.

Event Hubs'ı inceleme

Birçok Event Hubs işlemi belirli bir bölüm kapsamında gerçekleşir. Bölümler Event Hubs'a ait olduğundan, bölümlerin adları oluşturma sırasında atanır. Hangi bölümlerin kullanılabilir olduğunu anlamak için Event Hubs istemcilerinden birini kullanarak Event Hubs'ı sorgularsınız. Çizim için , EventHubProducerClient bu örneklerde gösterilmiştir, ancak kavram ve form istemciler arasında ortaktır.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Olayları Event Hubs'a yayımlama

Olayları yayımlamak için bir EventHubProducerClientoluşturmanız gerekir. Üreticiler olayları toplu olarak yayımlar ve belirli bir bölüm isteyebilir veya Event Hubs hizmetinin hangi bölüm olaylarında yayımlanması gerektiğine karar vermesine izin verebilir. Olayların yayımlanmasının yüksek oranda kullanılabilir olması gerektiğinde veya olay verilerinin bölümler arasında eşit bir şekilde dağıtılması gerektiğinde otomatik yönlendirme kullanmanızı öneririz. Örneğimiz otomatik yönlendirmeden yararlanır.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Event Hubs'dan olayları okuma

Event Hubs'dan olayları okumak için belirli bir tüketici grubu için bir EventHubConsumerClient oluşturmanız gerekir. Event Hubs oluşturulduğunda, Event Hubs'ı keşfetmeye başlamak için kullanılabilecek varsayılan bir tüketici grubu sağlar. Örneğimizde, yineleyici kullanarak Event Hubs'da yayımlanan tüm olayları okumaya odaklanacağız.

Dekont

Bu kullanım yaklaşımının Event Hubs istemci kitaplığını keşfetme ve prototip oluşturma deneyimini geliştirmeye yönelik olduğunu unutmayın. Üretim senaryolarında kullanılmaması önerilir. Üretim kullanımı için, daha sağlam ve yüksek performanslı bir deneyim sağladığından Olay İşlemcisi İstemcisi'ni kullanmanızı öneririz.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Event Hubs bölümünden olayları okuma

Belirli bir bölümden okumak için tüketicinin olay akışının neresinde olayları almaya başlayacağını belirtmesi gerekir; örneğimizde Event Hubs'ın ilk bölümü için yayımlanan tüm olayları okumaya odaklanıyoruz.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Olay İşlemcisi istemcisi kullanarak olayları işleme

Çoğu üretim senaryosunda EventProcessorClient , olayları okumak ve işlemek için kullanılması önerilir. EventProcessorClient durumunda kalıcılık için Azure Depolama bloblarına bağımlılığı olduğundan, kullanılacak depolama hesabı ve kapsayıcı için yapılandırılmış olan işlemci için bir BlobContainerClient sağlamanız gerekir.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}