Unauthorized access. 'Listen' claim(s) are required - Azure function using RUST (event hub)

Théo Ladal 0 Points de réputation
2024-11-27T13:09:05.3333333+00:00

I'm getting a weird issue while trying to run (locally, and on Azure) my Azure Func. I got this :


[2024-11-27T08:47:28.789Z] Error receiving event: Error { context: Custom(Custom { kind: Other, error: Fe2o3 Error: Receiver attach error RemoteClosedWithError(Error { condition: AmqpError(UnauthorizedAccess), description: Some("Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://backendiothub.servicebus.windows.net/backendiothubns/consumergroups/backendiotcg/partitions/0'. TrackingId:f5b003e1e2c54481adbcafaebc948090_G30, SystemTracker:gateway5, Timestamp:2024-11-27T08:47:28"), info: None }) }) }

But I'm pretty sure, I've granted full access to my func :

1/Azure portal - Role assignmentImage de l’utilisateur

2/Azure portal - Event Hub Namespace

Image de l’utilisateur

3/Azure portal - Environment variables

Image de l’utilisateur

Here is the code :

config.rs :


use std::{fmt, sync::Arc};

use anyhow::Result;
use azure_identity::DefaultAzureCredential;
use azure_messaging_eventhubs::consumer::ConsumerClient;
use figment::{
    providers::Env,
    Figment,
};
use serde::Deserialize;
use tracing::debug;

/// Structure contaning the config of the Azure EventHub.
#[derive(Deserialize, Clone, Default)]
pub struct EventHub {
    host: String,
    name: String,
    #[serde(alias="consumergroup")]
    consumer_group: String,
    #[serde(skip)]
    pub client: Option<Arc<ConsumerClient>>,
}

impl fmt::Debug for EventHub {
/// Custom Debug implementation as ConsumerClient do not implement it.
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("EventHub")
            .field("host", &self.host)
            .field("name", &self.name)
            .field("consumer_group", &self.name)
            .finish_non_exhaustive()
    }
}

/// Structure containing the configuaration of the application.
/// derive:
///     - Deserialize: can deserialize data to this structure.
///     - Clone: can be explicitly cloned if needed.
///     - Default: can be initialized empty. (mostly for testing)
///     - Debug: can be printed in debug output.
/// all the sub structure must implement or derive those trait;
#[derive(Deserialize, Clone, Default, Debug)]
pub struct Config {
    pub eventhub: EventHub,
}

impl Config {
    /// Create a new Config from the environment.
    ///
    /// It use the folowing environment variable:
    ///     - EVENTHUB_HOST (fully qualified namespace of the EventHub)
    ///     - EVENTHUB_NAME (name of the EventHub)
    /// the funcion then generates the credential from the default azure
    /// environment variables and use it to create a client that is added to
    /// the config.
    pub fn new() -> Result<Self> {
        let mut config: Config = Figment::new()
            .merge(
                Env::raw()
                .filter(|k| k.starts_with("EVENTHUB_"))
                .split("_")
            ).extract()?;
        let credential = DefaultAzureCredential::new()?;
        let client = ConsumerClient::new(
            config.eventhub.host.clone(),
            config.eventhub.name.clone(),
            Some(config.eventhub.consumer_group.clone()),
            credential,
            None,
        );
        config.eventhub.client = Some(Arc::new(client));
        debug!("Config: {config:?}");
        Ok(config)
    }
}

main.rs :


use anyhow::{bail, Result};
use tracing::info;
use futures::{pin_mut, StreamExt};
use tracing_subscriber::{fmt, EnvFilter};

mod config;
use config::Config;

async fn receive_events(config: &Config) -> Result<()>{
    let Some(ref client) = config.eventhub.client else {
        bail!("EventHub not correctly initialized!")
    };
    client.open().await.unwrap();
    info!("Connected to the event hub.");
    let event_stream = client
        .receive_events_on_partition(
            "0".to_string(),
            Some(
                azure_messaging_eventhubs::consumer::ReceiveOptions{
                    start_position: Some(azure_messaging_eventhubs::consumer::StartPosition{
                        location: azure_messaging_eventhubs::consumer::StartLocation::Earliest,
                        ..Default::default()
                    }),
                    ..Default::default()
                },
            ))
        .await;

    pin_mut!(event_stream);
    while let Some(event_result) = event_stream.next().await {
        match event_result {
            Ok(event) => {
                // Process the received event
                println!("Received event: {:?}", event);
            }
            Err(err) => {
                // Handle the error
                eprintln!("Error receiving event: {:?}", err);
            }
        }
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    fmt()
        .with_target(false)
        .with_level(true)
        .with_env_filter(EnvFilter::from_default_env())
        .init();
    let config = Config::new()?;
    receive_events(&config).await?;
    println!("funcion terminated");
    Ok(())
}

The lib I use : https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/eventhubs/azure_messaging_eventhubs/README.md

Any help is more than welcome. Thanks in advance

Azure
Azure
Plateforme et infrastructure de cloud computing pour la génération, le déploiement et la gestion d’applications et de services à travers un réseau mondial de centres de données gérés par Microsoft.
318 questions
0 commentaires Aucun commentaire
{count} votes

Votre réponse

Les réponses peuvent être marquées comme Réponses acceptées par l’auteur de la question, ce qui permet aux utilisateurs de connaître la réponse qui a résolu le problème de l’auteur.