Wanted: Help getting Event Hub "Hello World" Working with Partitions

Siegfried Heintze 1,861 Reputation points
2022-11-07T23:42:21.823+00:00

I was hoping to get the sample code from 6-event-hubs-programming-guide working with Visual Studio because this looked like the easiest example of C# Event hubs I could find (that was accompanied with nice explanations). However, I could not make it work after creating an event hub namespace & event hub & event hub authorization rule & storage account. When I compiled and ran the code from the heading "Publish events to an Event Hub" and then ran the code from the heading "Read events from an Event Hub" I was expecting to see the events appear and nothing happens...

I also tried adding a storage account and running the code under heading "Process events using an Event Processor client" after publishing the events again and nothing but my diagnostic WriteLine statements appeared.

So I have tried this second example Sample01_HelloWorld.md using C# script and bicep. I'm expecting that when I run publish.csx and read.csx at the same time, read.csx should display "Event Number: 1", "Event Number 2"... etc and this is not happening when I run both programs. I'm not getting errors, however. (same as before).

Questions:

  1. Please help me understand what I am doing wrong.
  2. When is it necessary (or advantageous) to use blob storage with Azure.Messaging.EventHubs.Processor.EventProcessorClient instead of the much simpler EventHubConsumerClient?

So when I get this working, I'll experiment with explicit partitions and the default consumer group.

Thanks

Siegfried

publish.csx:

#!/usr/bin/env dotnet-script  
#r "nuget: Azure.Messaging.EventHubs, 5.7.3"  
using Azure.Messaging.EventHubs;  
using Azure.Messaging.EventHubs.Producer;      
using Azure.Messaging.EventHubs.Consumer;      
using static System.Console;  
using static System.Environment;  
  
// https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample01_HelloWorld.md      
  
var connectionString = GetEnvironmentVariable("eventHubNamespaceConnectionString");  
var eventHubName = GetEnvironmentVariable("eventHubName");  
var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;  
var producer = new EventHubProducerClient(connectionString, eventHubName);  
var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);  
var max = 30000; // int.MaxValue  
try  
{  
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();  
  
    for (var counter = 0; counter < max; ++counter)  
    {  
        var s = $"Event Number: { counter }";  
        var eventBody = new BinaryData(s);  
        var eventData = new EventData(eventBody);  
  
        if (eventBatch.TryAdd(eventData))  
        {  
            if(counter % 100==0 )  
                WriteLine($"Added {s}");  
        }  
        else{  
            // At this point, the batch is full but our last event was not  
            // accepted.  For our purposes, the event is unimportant so we  
            // will intentionally ignore it.  In a real-world scenario, a  
            // decision would have to be made as to whether the event should  
            // be dropped or published on its own.  
            WriteLine($"add failed: {s}");  
            break;  
        }  
    }  
  
    // When the producer publishes the event, it will receive an  
    // acknowledgment from the Event Hubs service; so long as there is no  
    // exception thrown by this call, the service assumes responsibility for  
    // delivery.  Your event data will be published to one of the Event Hub  
    // partitions, though there may be a (very) slight delay until it is  
    // available to be consumed.  
  
    await producer.SendAsync(eventBatch);  
}  
catch  
{  
    // Transient failures will be automatically retried as part of the  
    // operation. If this block is invoked, then the exception was either  
    // fatal or all retries were exhausted without a successful publish.  
}  
finally  
{  
   await producer.CloseAsync();  
}  

read.csx:
#!/usr/bin/env dotnet-script
#r "nuget: Azure.Messaging.EventHubs, 5.7.3"
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs.Consumer;
using static System.Console;
using static System.Environment;
using System.Threading;

var connectionString = GetEnvironmentVariable("eventHubNamespaceConnectionString");  
var eventHubName = GetEnvironmentVariable("eventHubName");  
var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;  
var producer = new EventHubProducerClient(connectionString, eventHubName);  
var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);  
try  
{  
    // To ensure that we do not wait for an indeterminate length of time, we'll  
    // stop reading after we receive five events.  For a fresh Event Hub, those  
    // will be the first five that we had published.  We'll also ask for  
    // cancellation after 90 seconds, just to be safe.  
  
    using var cancellationSource = new CancellationTokenSource();  
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));  
  
    var maximumEvents = 5;  
    var eventDataRead = new List<string>();  
  
    await foreach (PartitionEvent partitionEvent in consumer.ReadEventsAsync(cancellationSource.Token))  
    {  
        eventDataRead.Add(partitionEvent.Data.EventBody.ToString());  
  
        if (eventDataRead.Count >= maximumEvents)  
        {  
            break;  
        }  
    }  
  
    // At this point, the data sent as the body of each event is held  
    // in the eventDataRead set.  
}  
catch  
{  
    // Transient failures will be automatically retried as part of the  
    // operation. If this block is invoked, then the exception was either  
    // fatal or all retries were exhausted without a successful read.  
}  
finally  
{  
   await consumer.CloseAsync();  
}  

deploy-EventHubDemo.bicep:

 // https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-bicep-namespace-event-hub?tabs=CLI  
 @description('Specifies a project name that is used to generate the Event Hub name and the Namespace name.')  
 param projectName string ='${uniqueString(resourceGroup().name)}-eventhubdemo'  
   
 @description('Specifies the Azure location for all resources.')  
 param location string = resourceGroup().location  
   
 @description('Specifies the messaging tier for Event Hub Namespace.')  
 @allowed([  
   'Basic'  
   'Standard'  
 ])  
 param eventHubSku string = 'Basic'  
   
 var eventHubNamespaceName = '${projectName}ns'  
 var eventHubName = projectName  
   
 resource eventHubNamespace 'Microsoft.EventHub/namespaces@2021-11-01' = {  
   name: eventHubNamespaceName  
   location: location  
   sku: {  
     name: eventHubSku  
     tier: eventHubSku  
     capacity: 1  
   }  
   properties: {  
     isAutoInflateEnabled: false  
     maximumThroughputUnits: 0  
   }     
  }  
  
  resource eventHubAuthorization 'Microsoft.EventHub/namespaces/authorizationRules@2022-01-01-preview'={  
    parent: eventHubNamespace  
    name: 'AuthorizationRules'  
    properties: {  
      rights: [  
        'Listen'  
        'Send'  
      ]  
    }  
   }  
   
 resource eventHub 'Microsoft.EventHub/namespaces/eventhubs@2021-11-01' = {  
   parent: eventHubNamespace  
   name: eventHubName  
   properties: {  
     messageRetentionInDays: 1  
     partitionCount: 4  
   }  
 }  
   
  
 var eventHubNamespaceConnectionString = listKeys(eventHubAuthorization.id,eventHubAuthorization.apiVersion).primaryConnectionString  
  
 // Output our variables  
   
 output eventHubNamespaceConnectionString string = eventHubNamespaceConnectionString  
 output eventHubName string = eventHubName  
  
  
// from https://blog.johnnyreilly.com/2021/07/07/output-connection-strings-and-keys-from-azure-bicep  
  
var storageAccountName = 'stgeventhubdemo'  
  
resource eventHubNamespaceName_storageAccount 'Microsoft.Storage/storageAccounts@2021-02-01' = {  
  name: storageAccountName  
  location: location  
  sku: {  
    name: 'Standard_LRS'  
    tier: 'Standard'  
  }  
  kind: 'StorageV2'  
  properties: {  
    networkAcls: {  
      bypass: 'AzureServices'  
      defaultAction: 'Allow'  
    }  
    accessTier: 'Hot'  
    allowBlobPublicAccess: false  
    minimumTlsVersion: 'TLS1_2'  
    allowSharedKeyAccess: true  
  }  
}  
  
// create a container inside that storage account  
  
var blobContainerName = 'eventhubdemo'  
  
resource storageAccountName_default_containerName 'Microsoft.Storage/storageAccounts/blobServices/containers@2021-02-01' = {  
  name: '${storageAccountName}/default/${blobContainerName}'  
  dependsOn: [  
    eventHubNamespaceName_storageAccount  
  ]  
}  
  
// Determine our connection strings  
  
var blobStorageConnectionString       = 'DefaultEndpointsProtocol=https;AccountName=${eventHubNamespaceName_storageAccount.name};EndpointSuffix=${environment().suffixes.storage};AccountKey=${listKeys(eventHubNamespaceName_storageAccount.id, eventHubNamespaceName_storageAccount.apiVersion).keys[0].value}'  
  
// Output our variables  
  
output blobStorageConnectionString string = blobStorageConnectionString  
output blobContainerName string = blobContainerName  
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
564 questions
{count} votes