Melakukan operasi umum dengan pustaka klien Event Hubs

Selesai

Unit ini berisi contoh operasi umum yang dapat Anda lakukan dengan pustaka klien Azure Event Hubs (Azure.Messaging.EventHubs) untuk berinteraksi dengan Azure Event Hubs.

Memeriksa Azure Event Hubs

Banyak operasi Azure Event Hubs berlangsung dalam cakupan partisi tertentu. Karena partisi dimiliki oleh Azure Event Hubs, namanya ditetapkan pada saat pembuatan. Untuk memahami partisi apa yang tersedia, Anda mengkueri Azure Event Hubs menggunakan salah satu klien Azure Event Hubs. Sebagai ilustrasi, EventHubProducerClient ditunjukkan dalam contoh ini, tetapi konsep dan bentuknya sudah umum di seluruh klien.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

Menerbitkan peristiwa ke Azure Event Hubs

Untuk menerbitkan peristiwa, Anda perlu membuat EventHubProducerClient. Produsen menerbitkan peristiwa dalam batch dan dapat meminta partisi tertentu atau memungkinkan layanan Event Hub memutuskan peristiwa partisi mana yang harus diterbitkan. Sebaiknya gunakan perutean otomatis ketika penerbitan peristiwa harus tersedia dalam jumlah besar atau ketika data peristiwa harus didistribusikan secara merata di antara partisi. Contoh kami memanfaatkan perutean otomatis.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

Membaca peristiwa dari Azure Event Hubs

Untuk membaca peristiwa dari Azure Event Hubs, Anda perlu membuat EventHubConsumerClient untuk grup konsumen tertentu. Saat Azure Event Hubs dibuat, Azure Event Hubs menyediakan grup konsumen default yang dapat digunakan untuk mulai menjelajahi Azure Event Hubs. Dalam contoh kami, kami akan fokus membaca semua peristiwa yang telah diterbitkan ke Azure Event Hubs menggunakan iterator.

Catatan

Perlu dicatat bahwa pendekatan untuk skenario konsumsi ini dimaksudkan untuk meningkatkan pengalaman penjelajahan pustaka klien Event Hubs dan pembuatan prototipe. Sebaiknya jangan gunakan pendekatan ini dalam skenario produksi. Untuk penggunaan produksi, sebaiknya gunakan Klien Prosesor peristiwa karena dapat memberikan pengalaman dengan performa yang lebih baik dan kuat.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Membaca peristiwa dari partisi Azure Event Hubs

Untuk membaca dari partisi tertentu, konsumen perlu menentukan di mana dalam aliran peristiwa untuk mulai menerima peristiwa; dalam contoh kami, kami berfokus pada membaca semua peristiwa yang diterbitkan untuk partisi pertama Azure Event Hubs.

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

Memproses peristiwa menggunakan klien Prosesor Acara

Untuk sebagian besar skenario produksi, disarankan agar EventProcessorClient digunakan untuk membaca dan memproses peristiwa. EventProcessorClient Karena memiliki dependensi pada blob Azure Storage untuk persistensi statusnya, Anda perlu menyediakan BlobContainerClient untuk prosesor, yang telah dikonfigurasi untuk akun penyimpanan dan kontainer yang harus digunakan.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}