Envoyer des événements à des hubs d’événements ou en recevoir de ces derniers à l’aide de Python

Ce guide de démarrage rapide montre comment recevoir des événements d’un hub d’événements et lui en envoyer à l’aide du package Python azure-eventhub.

Prérequis

Si vous débutez avec Azure Event Hubs, consultez la vue d’ensemble d’Event Hubs avant de suivre ce guide de démarrage rapide.

Pour effectuer ce démarrage rapide, vous avez besoin de ce qui suit :

  • Abonnement Microsoft Azure. Pour utiliser les services Azure, y compris Azure Event Hubs, vous avez besoin d’un abonnement. Si vous n’avez pas de compte Azure existant, inscrivez-vous pour obtenir un essai gratuit.
  • Python 3.8 ou version ultérieure, avec pip installé et à jour.
  • Visual Studio Code (recommandé) ou tout autre environnement de développement intégré (IDE).
  • Créez un espace de noms Event Hubs et un Event Hub. La première étape consiste à utiliser le portail Azure pour créer un espace de noms de type Event Hubs et obtenir les informations de gestion nécessaires à votre application pour communiquer avec le concentrateur d’événements. Pour créer un espace de noms et un hub d’événements, suivez la procédure décrite dans cet article.

Installer les packages pour envoyer des événements

Pour installer les packages Python pour Event Hubs, ouvrez une invite de commandes qui a Python dans son chemin. Remplacez le répertoire par le dossier dans lequel vous souhaitez conserver vos exemples.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Authentifier l’application sur Azure

Ce guide de démarrage rapide présente deux façons de vous connecter à Azure Event Hubs : sans mot de passe et avec une chaîne de connexion. La première option vous explique comment utiliser votre principal de sécurité dans Microsoft Entra ID et le contrôle d’accès en fonction du rôle (RBAC) pour vous connecter à un espace de noms Event Hubs. Vous n’avez pas à vous soucier d’avoir des chaînes de connexion codées en dur dans votre code, dans un fichier config ni dans un stockage sécurisé comme Azure Key Vault. La deuxième option consiste à se servir d’une chaîne de connexion pour se connecter à un espace de noms Event Hubs. Si vous débutez avec Azure, vous trouverez peut-être l’option de chaîne de connexion plus facile à suivre. Nous vous recommandons d’utiliser l’option sans mot de passe dans les applications réelles et les environnements de production. Pour plus d’informations, consultez Authentification et autorisation. Pour en savoir plus sur l’authentification sans mot de passe, reportez-vous à la page de présentation.

Attribuer des rôles à votre utilisateur Microsoft Entra

Lors du développement localement, vous devez vérifier que le compte d’utilisateur qui se connecte à Azure Event Hubs dispose des autorisations appropriées. Vous aurez besoin du rôle Propriétaire de données Azure Event Hubs pour envoyer et recevoir des messages. Pour vous attribuer ce rôle, vous aurez besoin du rôle Administrateur de l’accès utilisateur ou d’un autre rôle qui inclut l’action Microsoft.Authorization/roleAssignments/write. Vous pouvez attribuer des rôles RBAC Azure à un utilisateur à l’aide du Portail Azure, Azure CLI ou Azure PowerShell. Découvrez les étendues disponibles pour les attributions de rôles dans la page vue d’ensemble de l’étendue.

L’exemple suivant attribue le rôle Azure Event Hubs Data Owner à votre compte d’utilisateur, qui fournit un accès complet aux ressources Azure Event Hubs. Dans un scénario réel, suivez le principe des privilèges minimum pour accorder aux utilisateurs uniquement les autorisations minimales nécessaires à un environnement de production plus sécurisé.

Rôles intégrés Azure pour Azure Event Hubs

Pour Azure Event Hubs, la gestion des espaces de noms et de toutes les ressources associées via le portail Azure et l’API de gestion des ressources Azure est déjà protégée à l’aide du modèle RBAC Azure. Azure fournit les rôles Azure intégrés ci-dessous pour autoriser l’accès à un espace de noms Event Hubs :

Si vous souhaitez créer un rôle personnalisé, consultez Droits requis pour les opérations Service Bus.

Important

Dans la plupart des cas, la propagation de l’attribution de rôle dans Azure peut prendre une ou deux minutes. Dans de rares cas, cela peut prendre jusqu’à huit minutes. Si vous recevez des erreurs d’authentification lorsque vous exécutez votre code pour la première fois, patientez quelques instants et réessayez.

  1. Dans le portail Azure, recherchez votre espace de noms Event Hubs à l’aide de la barre de recherche principale ou de la navigation gauche.

  2. Dans la page vue d’ensemble, sélectionnez Contrôle d’accès (IAM) dans le menu de gauche.

  3. Sur la page Contrôle d’accès (IAM), sélectionnez l’onglet Attributions de rôles.

  4. Sélectionnez + Ajouter dans le menu supérieur, puis Ajouter une attribution de rôle dans le menu déroulant résultant.

    A screenshot showing how to assign a role.

  5. Utilisez la zone de recherche pour filtrer les résultats sur le rôle souhaité. Pour cet exemple, recherchez Azure Event Hubs Data Owner et sélectionnez le résultat correspondant. Ensuite, choisissez Suivant.

  6. Sous Attribuer l’accès à, sélectionnez Utilisateur, groupe ou principal de service, puis sélectionnez + Sélectionner des membres.

  7. Dans la boîte de dialogue, recherchez votre nom d’utilisateur Microsoft Entra (généralement votre adresse e-mail utilisateur@domaine), puis choisissez Sélectionner en bas de la boîte de dialogue.

  8. Sélectionnez Vérifier + affecter pour accéder à la page finale, puis Vérifier + attribuer à nouveau pour terminer le processus.

Envoyer des événements

Dans cette section, vous allez créer un script Python pour envoyer des événements au hub d’événements que vous avez créé.

  1. Ouvrez votre éditeur Python favori, tel que Visual Studio Code.

  2. Créez un script appelé send.py. Ce script envoie un lot d’événements au hub d’événements que vous avez créé.

  3. Collez le code suivant dans send.py :

    Dans le code, utilisez des valeurs réelles pour remplacer les espaces réservés suivants :

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Notes

    Pour obtenir des exemples d’autres options d’envoi d’événements à Event Hub de manière asynchrone à l’aide d’une chaîne de connexion, consultez la page GitHub send_async.py. Les modèles indiqués s’appliquent également à l’envoi d’événements sans mot de passe.

Recevoir des événements

Ce guide de démarrage rapide utilise un stockage Blob Azure comme magasin de points de contrôle. Le magasin de points de contrôle est utilisé pour conserver les points de contrôle (autrement dit, les dernières positions de lecture).

Suivez les recommandations ci-dessous quand vous utilisez Stockage Blob Azure comme magasin de points de contrôle :

  • Utilisez un conteneur distinct pour chaque groupe de consommateurs. Vous pouvez utiliser le même compte de stockage, mais utiliser un conteneur par groupe.
  • N’utilisez pas le conteneur et le compte de stockage pour quoi que ce soit d’autre.
  • Le compte de stockage doit se trouver dans la même région que l’application déployée. Si l’application est locale, essayez de choisir la région la plus proche possible.

Sur la page Compte de stockage du Portail Azure, dans la section Service BLOB, vérifiez que les paramètres suivants sont désactivés.

  • Espace de noms hiérarchique
  • Suppression réversible de blob
  • Contrôle de version

Créer un compte de stockage Azure et un conteneur d’objets blob

Créez un compte de stockage Azure et un conteneur d’objets blob dans celui-ci en effectuant les étapes suivantes :

  1. Création d’un compte de stockage Azure
  2. Créez un conteneur d’objets blob.
  3. S’authentifier auprès du conteneur d’objets blob.

Veillez à enregistrer la chaîne de connexion et le nom du conteneur en vue de les utiliser dans le code de réception.

Lors du développement local, assurez-vous que le compte d’utilisateur qui accède aux données blob dispose des autorisations appropriées. Vous aurez besoin du Contributeur aux données Blob de stockage pour lire et écrire des données blob. Pour vous attribuer ce rôle, vous aurez besoin du rôle Administrateur de l’accès utilisateur ou d’un autre rôle qui inclut l’action Microsoft.Authorization/roleAssignments/write. Vous pouvez attribuer des rôles RBAC Azure à un utilisateur à l’aide du Portail Azure, Azure CLI ou Azure PowerShell. Vous pouvez en savoir plus sur les étendues disponibles pour les attributions de rôles dans la page vue d’ensemble de l’étendue .

Dans ce scénario, vous allez attribuer des autorisations à votre compte d’utilisateur, étendues au compte de stockage, pour suivre le Principe des privilèges minimum. Cette pratique offre aux utilisateurs uniquement les autorisations minimales nécessaires et crée des environnements de production plus sécurisés.

L’exemple suivant affecte le rôle Contributeur aux données Blob du stockage à votre compte d’utilisateur, qui fournit à la fois un accès en lecture et en écriture aux données d’objet blob dans votre compte de stockage.

Important

Dans la plupart des cas, la propagation de l’attribution de rôle dans Azure peut prendre une ou deux minutes, mais dans de rares cas, cela peut prendre jusqu’à huit minutes. Si vous recevez des erreurs d’authentification lorsque vous exécutez votre code pour la première fois, patientez quelques instants et réessayez.

  1. Dans le Portail Azure, recherchez votre compte de stockage à l’aide de la barre de recherche principale ou de la navigation gauche.

  2. Dans la page vue d’ensemble du compte de stockage, sélectionnez Contrôle d’accès (IAM) dans le menu de gauche.

  3. Sur la page Contrôle d’accès (IAM), sélectionnez l’onglet Attributions de rôles.

  4. Sélectionnez + Ajouter dans le menu supérieur, puis Ajouter une attribution de rôle dans le menu déroulant résultant.

    A screenshot showing how to assign a storage account role.

  5. Utilisez la zone de recherche pour filtrer les résultats sur le rôle souhaité. Pour cet exemple, recherchez Contributeur aux données Blob du stockage, sélectionnez le résultat correspondant, puis choisissez Suivant.

  6. Sous Attribuer l’accès à, sélectionnez Utilisateur, groupe ou principal de service, puis sélectionnez + Sélectionner des membres.

  7. Dans la boîte de dialogue, recherchez votre nom d’utilisateur Microsoft Entra (généralement votre adresse e-mail utilisateur@domaine), puis choisissez Sélectionner en bas de la boîte de dialogue.

  8. Sélectionnez Vérifier + affecter pour accéder à la page finale, puis Vérifier + attribuer à nouveau pour terminer le processus.

Installer les packages pour recevoir des événements

Pour le côté récepteur, vous avez besoin d’installer un autre package, ou plus. Dans ce guide de démarrage rapide, vous utilisez le stockage Blob Azure pour conserver les points de contrôle afin que le programme ne lise pas les événements qu’il a déjà lus. Il effectue des points de contrôle de métadonnées sur les messages reçus à intervalles réguliers dans un blob. Cette approche permet ultérieurement de continuer à recevoir des messages à partir du point où vous vous étiez arrêté.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Créer un script Python pour recevoir des événements

Dans cette section, vous allez créer un script Python pour recevoir des événements de votre Event Hub :

  1. Ouvrez votre éditeur Python favori, tel que Visual Studio Code.

  2. Créez un script appelé recv.py.

  3. Collez le code suivant dans recv.py :

    Dans le code, utilisez des valeurs réelles pour remplacer les espaces réservés suivants :

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Notes

    Pour obtenir des exemples d’autres options permettant de recevoir des événements d’Event Hub de manière asynchrone à l’aide d’une chaîne de connexion, consultez la page GitHub recv_with_checkpoint_store_async.py. Les modèles indiqués s’appliquent également à la réception d’événements sans mot de passe.

Exécuter l’application réceptrice

Pour exécuter le script, ouvrez une invite de commandes qui a Python dans son chemin, puis exécutez cette commande :

python recv.py

Exécuter l’application de l’expéditeur

Pour exécuter le script, ouvrez une invite de commandes qui a Python dans son chemin, puis exécutez cette commande :

python send.py

La fenêtre réceptrice doit afficher les messages qui ont été envoyés au hub d’événements.

Dépannage

Si vous ne voyez pas d’événements dans la fenêtre du récepteur ou si le code signale une erreur, essayez les conseils de dépannage suivants :

  • Si vous ne voyez pas les résultats de recy.py, exécutez send.py plusieurs fois.

  • Si vous voyez des erreurs concernant « coroutine » lors de l’utilisation du code sans mot de passe (avec des informations d’identification), vérifiez que vous utilisez l’importation à partir de azure.identity.aio.

  • Si vous voyez « Session client non fermée » avec du code sans mot de passe (avec des informations d’identification), veillez à fermer les informations d’identification lorsque vous avez terminé. Pour plus d’informations, consultez Informations asynchrones.

  • Si vous voyez des erreurs d’autorisation avec recv.py lors de l’accès au stockage, vérifiez que vous avez suivi les étapes décrites dans Créer un compte de stockage Azure et un conteneur d’objets blob et que vous avez attribué le rôle Contributeur aux données blob de stockage au principal de service.

  • Si vous recevez des événements avec des ID de partition différents, ce résultat est attendu. Les partitions constituent un mécanisme d’organisation des données. Elles sont liées au degré de parallélisme en aval requis lors de la consommation des applications. Le choix du nombre de partitions dans un concentrateur d’événements est directement lié au nombre de lecteurs simultanés que vous prévoyez d’avoir. Pour plus d’informations, consultez En savoir plus sur les partitions.

Étapes suivantes

Dans ce guide de démarrage rapide, vous avez envoyé et reçu des événements de manière asynchrone. Pour savoir comment envoyer et recevoir des événements de manière synchrone, accédez à la page GitHub sync_samples.

Pour tous les exemples (synchrones comme asynchrones) sur GitHub, accédez à la bibliothèque cliente Azure Event Hubs des exemples Python.