I'm getting a weird issue while trying to run (locally, and on Azure) my Azure Func. I got this :
[2024-11-27T08:47:28.789Z] Error receiving event: Error { context: Custom(Custom { kind: Other, error: Fe2o3 Error: Receiver attach error RemoteClosedWithError(Error { condition: AmqpError(UnauthorizedAccess), description: Some("Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://backendiothub.servicebus.windows.net/backendiothubns/consumergroups/backendiotcg/partitions/0'. TrackingId:f5b003e1e2c54481adbcafaebc948090_G30, SystemTracker:gateway5, Timestamp:2024-11-27T08:47:28"), info: None }) }) }
But I'm pretty sure, I've granted full access to my func :
1/Azure portal - Role assignment
2/Azure portal - Event Hub Namespace
3/Azure portal - Environment variables
Here is the code :
config.rs :
use std::{fmt, sync::Arc};
use anyhow::Result;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::consumer::ConsumerClient;
use figment::{
providers::Env,
Figment,
};
use serde::Deserialize;
use tracing::debug;
/// Structure contaning the config of the Azure EventHub.
#[derive(Deserialize, Clone, Default)]
pub struct EventHub {
host: String,
name: String,
#[serde(alias="consumergroup")]
consumer_group: String,
#[serde(skip)]
pub client: Option<Arc<ConsumerClient>>,
}
impl fmt::Debug for EventHub {
/// Custom Debug implementation as ConsumerClient do not implement it.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventHub")
.field("host", &self.host)
.field("name", &self.name)
.field("consumer_group", &self.name)
.finish_non_exhaustive()
}
}
/// Structure containing the configuaration of the application.
/// derive:
/// - Deserialize: can deserialize data to this structure.
/// - Clone: can be explicitly cloned if needed.
/// - Default: can be initialized empty. (mostly for testing)
/// - Debug: can be printed in debug output.
/// all the sub structure must implement or derive those trait;
#[derive(Deserialize, Clone, Default, Debug)]
pub struct Config {
pub eventhub: EventHub,
}
impl Config {
/// Create a new Config from the environment.
///
/// It use the folowing environment variable:
/// - EVENTHUB_HOST (fully qualified namespace of the EventHub)
/// - EVENTHUB_NAME (name of the EventHub)
/// the funcion then generates the credential from the default azure
/// environment variables and use it to create a client that is added to
/// the config.
pub fn new() -> Result<Self> {
let mut config: Config = Figment::new()
.merge(
Env::raw()
.filter(|k| k.starts_with("EVENTHUB_"))
.split("_")
).extract()?;
let credential = DefaultAzureCredential::new()?;
let client = ConsumerClient::new(
config.eventhub.host.clone(),
config.eventhub.name.clone(),
Some(config.eventhub.consumer_group.clone()),
credential,
None,
);
config.eventhub.client = Some(Arc::new(client));
debug!("Config: {config:?}");
Ok(config)
}
}
main.rs :
use anyhow::{bail, Result};
use tracing::info;
use futures::{pin_mut, StreamExt};
use tracing_subscriber::{fmt, EnvFilter};
mod config;
use config::Config;
async fn receive_events(config: &Config) -> Result<()>{
let Some(ref client) = config.eventhub.client else {
bail!("EventHub not correctly initialized!")
};
client.open().await.unwrap();
info!("Connected to the event hub.");
let event_stream = client
.receive_events_on_partition(
"0".to_string(),
Some(
azure_messaging_eventhubs::consumer::ReceiveOptions{
start_position: Some(azure_messaging_eventhubs::consumer::StartPosition{
location: azure_messaging_eventhubs::consumer::StartLocation::Earliest,
..Default::default()
}),
..Default::default()
},
))
.await;
pin_mut!(event_stream);
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
// Process the received event
println!("Received event: {:?}", event);
}
Err(err) => {
// Handle the error
eprintln!("Error receiving event: {:?}", err);
}
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
fmt()
.with_target(false)
.with_level(true)
.with_env_filter(EnvFilter::from_default_env())
.init();
let config = Config::new()?;
receive_events(&config).await?;
println!("funcion terminated");
Ok(())
}
The lib I use : https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/eventhubs/azure_messaging_eventhubs/README.md
Any help is more than welcome. Thanks in advance