Wanted: Help getting Event Hub "Hello World" Working with Partitions
I was hoping to get the sample code from 6-event-hubs-programming-guide working with Visual Studio because this looked like the easiest example of C# Event hubs I could find (that was accompanied with nice explanations). However, I could not make it work after creating an event hub namespace & event hub & event hub authorization rule & storage account. When I compiled and ran the code from the heading "Publish events to an Event Hub" and then ran the code from the heading "Read events from an Event Hub" I was expecting to see the events appear and nothing happens...
I also tried adding a storage account and running the code under heading "Process events using an Event Processor client" after publishing the events again and nothing but my diagnostic WriteLine statements appeared.
So I have tried this second example Sample01_HelloWorld.md using C# script and bicep. I'm expecting that when I run publish.csx and read.csx at the same time, read.csx should display "Event Number: 1", "Event Number 2"... etc and this is not happening when I run both programs. I'm not getting errors, however. (same as before).
Questions:
- Please help me understand what I am doing wrong.
- When is it necessary (or advantageous) to use blob storage with Azure.Messaging.EventHubs.Processor.EventProcessorClient instead of the much simpler EventHubConsumerClient?
So when I get this working, I'll experiment with explicit partitions and the default consumer group.
Thanks
Siegfried
publish.csx:
#!/usr/bin/env dotnet-script
#r "nuget: Azure.Messaging.EventHubs, 5.7.3"
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs.Consumer;
using static System.Console;
using static System.Environment;
// https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample01_HelloWorld.md
var connectionString = GetEnvironmentVariable("eventHubNamespaceConnectionString");
var eventHubName = GetEnvironmentVariable("eventHubName");
var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
var producer = new EventHubProducerClient(connectionString, eventHubName);
var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
var max = 30000; // int.MaxValue
try
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync();
for (var counter = 0; counter < max; ++counter)
{
var s = $"Event Number: { counter }";
var eventBody = new BinaryData(s);
var eventData = new EventData(eventBody);
if (eventBatch.TryAdd(eventData))
{
if(counter % 100==0 )
WriteLine($"Added {s}");
}
else{
// At this point, the batch is full but our last event was not
// accepted. For our purposes, the event is unimportant so we
// will intentionally ignore it. In a real-world scenario, a
// decision would have to be made as to whether the event should
// be dropped or published on its own.
WriteLine($"add failed: {s}");
break;
}
}
// When the producer publishes the event, it will receive an
// acknowledgment from the Event Hubs service; so long as there is no
// exception thrown by this call, the service assumes responsibility for
// delivery. Your event data will be published to one of the Event Hub
// partitions, though there may be a (very) slight delay until it is
// available to be consumed.
await producer.SendAsync(eventBatch);
}
catch
{
// Transient failures will be automatically retried as part of the
// operation. If this block is invoked, then the exception was either
// fatal or all retries were exhausted without a successful publish.
}
finally
{
await producer.CloseAsync();
}
read.csx:
#!/usr/bin/env dotnet-script
#r "nuget: Azure.Messaging.EventHubs, 5.7.3"
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs.Consumer;
using static System.Console;
using static System.Environment;
using System.Threading;
var connectionString = GetEnvironmentVariable("eventHubNamespaceConnectionString");
var eventHubName = GetEnvironmentVariable("eventHubName");
var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
var producer = new EventHubProducerClient(connectionString, eventHubName);
var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
try
{
// To ensure that we do not wait for an indeterminate length of time, we'll
// stop reading after we receive five events. For a fresh Event Hub, those
// will be the first five that we had published. We'll also ask for
// cancellation after 90 seconds, just to be safe.
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));
var maximumEvents = 5;
var eventDataRead = new List<string>();
await foreach (PartitionEvent partitionEvent in consumer.ReadEventsAsync(cancellationSource.Token))
{
eventDataRead.Add(partitionEvent.Data.EventBody.ToString());
if (eventDataRead.Count >= maximumEvents)
{
break;
}
}
// At this point, the data sent as the body of each event is held
// in the eventDataRead set.
}
catch
{
// Transient failures will be automatically retried as part of the
// operation. If this block is invoked, then the exception was either
// fatal or all retries were exhausted without a successful read.
}
finally
{
await consumer.CloseAsync();
}
deploy-EventHubDemo.bicep:
// https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-bicep-namespace-event-hub?tabs=CLI
@description('Specifies a project name that is used to generate the Event Hub name and the Namespace name.')
param projectName string ='${uniqueString(resourceGroup().name)}-eventhubdemo'
@description('Specifies the Azure location for all resources.')
param location string = resourceGroup().location
@description('Specifies the messaging tier for Event Hub Namespace.')
@allowed([
'Basic'
'Standard'
])
param eventHubSku string = 'Basic'
var eventHubNamespaceName = '${projectName}ns'
var eventHubName = projectName
resource eventHubNamespace 'Microsoft.EventHub/namespaces@2021-11-01' = {
name: eventHubNamespaceName
location: location
sku: {
name: eventHubSku
tier: eventHubSku
capacity: 1
}
properties: {
isAutoInflateEnabled: false
maximumThroughputUnits: 0
}
}
resource eventHubAuthorization 'Microsoft.EventHub/namespaces/authorizationRules@2022-01-01-preview'={
parent: eventHubNamespace
name: 'AuthorizationRules'
properties: {
rights: [
'Listen'
'Send'
]
}
}
resource eventHub 'Microsoft.EventHub/namespaces/eventhubs@2021-11-01' = {
parent: eventHubNamespace
name: eventHubName
properties: {
messageRetentionInDays: 1
partitionCount: 4
}
}
var eventHubNamespaceConnectionString = listKeys(eventHubAuthorization.id,eventHubAuthorization.apiVersion).primaryConnectionString
// Output our variables
output eventHubNamespaceConnectionString string = eventHubNamespaceConnectionString
output eventHubName string = eventHubName
// from https://blog.johnnyreilly.com/2021/07/07/output-connection-strings-and-keys-from-azure-bicep
var storageAccountName = 'stgeventhubdemo'
resource eventHubNamespaceName_storageAccount 'Microsoft.Storage/storageAccounts@2021-02-01' = {
name: storageAccountName
location: location
sku: {
name: 'Standard_LRS'
tier: 'Standard'
}
kind: 'StorageV2'
properties: {
networkAcls: {
bypass: 'AzureServices'
defaultAction: 'Allow'
}
accessTier: 'Hot'
allowBlobPublicAccess: false
minimumTlsVersion: 'TLS1_2'
allowSharedKeyAccess: true
}
}
// create a container inside that storage account
var blobContainerName = 'eventhubdemo'
resource storageAccountName_default_containerName 'Microsoft.Storage/storageAccounts/blobServices/containers@2021-02-01' = {
name: '${storageAccountName}/default/${blobContainerName}'
dependsOn: [
eventHubNamespaceName_storageAccount
]
}
// Determine our connection strings
var blobStorageConnectionString = 'DefaultEndpointsProtocol=https;AccountName=${eventHubNamespaceName_storageAccount.name};EndpointSuffix=${environment().suffixes.storage};AccountKey=${listKeys(eventHubNamespaceName_storageAccount.id, eventHubNamespaceName_storageAccount.apiVersion).keys[0].value}'
// Output our variables
output blobStorageConnectionString string = blobStorageConnectionString
output blobContainerName string = blobContainerName