Freigeben über


Schnellstart: Senden von Ereignissen an oder Empfangen von Ereignissen von Event Hubs mithilfe von Python

In dieser Schnellstartanleitung erfahren Sie, wie Sie Ereignisse mithilfe des Azure-eventhub Python-Pakets an einen Event Hub senden und ereignisse von einem Event Hub empfangen.

Voraussetzungen

Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.

Um diese Schnellstartanleitung abzuschließen, stellen Sie sicher, dass Sie über die folgenden Voraussetzungen verfügen:

  • Microsoft Azure-Abonnement: Registrieren Sie sich für eine kostenlose Testversion , wenn Sie kein Abonnement haben.
  • Python 3.8 oder höher: Stellen Sie sicher, dass pip installiert und aktualisiert wird.
  • Visual Studio Code (empfohlen): Oder verwenden Sie eine andere IDE Ihrer Wahl.
  • Event Hubs-Namespace und Event Hub: Befolgen Sie dieses Handbuch , um sie im Azure-Portal zu erstellen.

Installieren der Pakete zum Senden von Ereignissen

Um die Python-Pakete für Event Hubs zu installieren, öffnen Sie eine Eingabeaufforderung mit Python im Pfad. Ändern Sie das Verzeichnis in den Ordner, in dem Sie Ihre Beispiele speichern möchten.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Authentifizieren der App bei Azure

In dieser Schnellstartanleitung werden zwei Möglichkeiten zum Herstellen einer Verbindung mit Azure Event Hubs gezeigt:

  • Kennwortlos. Verwenden Sie Ihren Sicherheitsprinzipal in Microsoft Entra ID und die rollenbasierte Zugriffssteuerung (RBAC), um eine Verbindung zu einem Event Hubs Namespace herzustellen. Sie müssen sich keine Gedanken darüber machen, dass hartcodierte Verbindungszeichenfolgen in Ihrem Code, in einer Konfigurationsdatei oder im sicheren Speicher wie Azure Key Vault vorhanden sind.
  • Verbindungszeichenfolge. Verwenden Sie eine Verbindungszeichenfolge, um eine Verbindung mit einem Event Hubs-Namespace herzustellen. Wenn Sie noch nicht mit Azure vertraut sind, ist die Option mit einer Verbindungszeichenfolge möglicherweise einfacher.

In realen Anwendungen und Produktionsumgebungen wird die kennwortlose Option empfohlen. Weitere Informationen finden Sie unter Service Bus-Authentifizierung und Autorisierung und kennwortloseVerbindungen für Azure-Dienste.

Zuweisen von Rollen zu Ihrem Microsoft Entra-Benutzer

Stellen Sie bei der lokalen Entwicklung sicher, dass das Benutzerkonto, das eine Verbindung mit Azure Event Hubs herstellt, über die richtigen Berechtigungen verfügt. Sie benötigen die Azure Event Hubs-Rolle "Datenbesitzer ", um Nachrichten zu senden und zu empfangen. Um sich selbst diese Rolle zuzuweisen, benötigen Sie die Rolle des Benutzerzugriffsadministrators oder eine andere Rolle, die die Microsoft.Authorization/roleAssignments/write Aktion enthält. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen finden Sie auf der Seite Umfang von Azure RBAC verstehen.

Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Azure Event Hubs Data Owner zugewiesen. Diese Rolle bietet Vollzugriff auf Azure Event Hubs-Ressourcen. Halten Sie sich in einem echten Szenario an das Prinzip der geringsten Rechte, um Benutzern nur die benötigten Mindestberechtigungen zu erteilen und so die Produktionsumgebung besser zu schützen.

In Azure integrierte Rollen für Azure Event Hubs

Bei Azure Event Hubs ist die Verwaltung der Namespaces und aller zugehörigen Ressourcen über das Azure-Portal und die Azure-Ressourcenverwaltungs-API bereits durch das Azure RBAC-Modell geschützt. Azure bietet die folgenden integrierten Rollen zum Autorisieren des Zugriffs auf einen Event Hubs-Namespace:

Informationen zum Erstellen einer benutzerdefinierten Rolle finden Sie unter Erforderliche Rechte für Event Hubs-Vorgänge.

Wichtig

In der Regel dauert die Verteilung der Rollenzuweisung in Azure ein bis zwei Minuten. In seltenen Fällen kann es bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.

  1. Navigieren Sie im Azure-Portal zu Ihrem Event Hubs-Namespace. Verwenden Sie dazu entweder die Hauptsuchleiste oder die linke Navigationsleiste.

  2. Wählen Sie auf der Übersichtsseite im linken Menü die Option Zugriffssteuerung (IAM) aus.

  3. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.

  4. Wählen Sie im oberen Menü +Hinzufügen aus. Wählen Sie dann " Rollenzuweisung hinzufügen" aus.

    Screenshot, der zeigt, wie eine Rolle zugewiesen wird.

  5. Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Azure Event Hubs Data Owner, und wählen Sie das entsprechende Ergebnis aus. Klicken Sie dann auf Weiter.

  6. Wählen Sie unter Zugriff zuweisen für die Option Benutzer, Gruppe oder Dienstprinzipal aus. Wählen Sie dann +Mitglieder auswählen.

  7. Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre user@domain E-Mail-Adresse). Wählen Sie unten im Dialogfeld "Auswählen" aus.

  8. Wählen Sie "Überprüfen" und "Zuweisen" aus, um zur endgültigen Seite zu wechseln. Wählen Sie „Überprüfen + zuweisen“ erneut aus, um den Vorgang abzuschließen.

Senden von Ereignisse

Erstellen Sie in diesem Abschnitt ein Python-Skript, um Ereignisse an den Event Hub zu senden, den Sie zuvor erstellt haben.

  1. Öffnen Sie Ihren bevorzugten Python-Editor, z.B. Visual Studio Code.

  2. Erstellen Sie ein Skript mit dem Namen send.py. Dieses Skript sendet einen Batch von Ereignissen an den Event Hub, den Sie zuvor erstellt haben.

  3. Fügen Sie den folgenden Code in send.py ein:

    Ersetzen Sie im Code die folgenden Platzhalter durch echte Werte:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE – Auf der Seite "Übersicht" des Namespace wird der vollqualifizierte Name angezeigt. Er sollte im Format vorliegen: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME: Name des Event Hubs.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Hinweis

    Beispiele für andere Optionen zum asynchronen Senden von Ereignissen an einen Event Hub mithilfe einer Verbindungszeichenfolge finden Sie auf der GitHub-send_async.py-Seite. Die dort gezeigten Muster lassen sich auch auf das kennwortlose Senden von Ereignissen anwenden.

Empfangen von Ereignissen

In dieser Schnellstartanleitung wird Azure Blob Storage als Prüfpunktspeicher verwendet. Der Prüfpunktspeicher wird verwendet, um Prüfpunkte (d. h. die letzte Leseposition) beizubehalten.

Befolgen Sie diese Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:

  • Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
  • Verwenden Sie das Speicherkonto nicht für andere Elemente.
  • Verwenden Sie den Container nicht für andere Elemente.
  • Erstellen Sie das Speicherkonto in derselben Region wie die bereitgestellte Anwendung. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.

Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.

  • Hierarchischer Namespace
  • Vorläufiges Löschen von Blobs
  • Versionsverwaltung

Erstellen eines Azure-Speicherkontos und eines Blobcontainers

Führen Sie die folgenden Schritte aus, um ein Azure-Speicherkonto und einen darin befindlichen Blobcontainer zu erstellen:

  1. Erstellen Sie ein Azure Storage-Konto.
  2. Erstellen Sie einen Blobcontainer.
  3. Authentifizieren Sie sich beim Blobcontainer.

Notieren Sie sich die Verbindungszeichenfolge und den Containernamen zur späteren Verwendung im Empfangscode.

Stellen Sie bei der lokalen Entwicklung sicher, dass das Benutzerkonto, das auf BLOB-Daten zugreift, über die richtigen Berechtigungen verfügt. Sie benötigen die Berechtigung Mitwirkender an Speicherblobdaten zum Lesen und Schreiben von Blobdaten. Um sich selbst diese Rolle zuzuweisen, müssen Sie der Rolle "Benutzerzugriffsadministrator " oder einer anderen Rolle zugewiesen werden, die die Aktion "Microsoft.Authorization/roleAssignments/write " enthält. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen finden Sie in der Grundlegendes zum Bereich von Azure RBAC.

In diesem Szenario weisen Sie Ihrem Benutzerkonto Berechtigungen zu, die auf das Speicherkonto festgelegt sind, um dem Prinzip der geringsten Rechte zu folgen. Auf diese Weise erhalten Benutzer nur die erforderlichen Mindestberechtigungen, und es entstehen sicherere Produktionsumgebungen.

Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle " Storage Blob Data Contributor" zugewiesen, die sowohl Lese- als auch Schreibzugriff auf BLOB-Daten in Ihrem Speicherkonto bietet.

Wichtig

In der Regel dauert die Verteilung der Rollenzuweisung in Azure ein bis zwei Minuten. In seltenen Fällen kann es bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.

  1. Suchen Sie im Azure-Portal Ihr Speicherkonto mithilfe der Hauptsuchleiste oder der linken Navigationsleiste.

  2. Wählen Sie auf der Seite "Speicherkonto" im linken Menü die Zugriffssteuerung (IAM) aus.

  3. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.

  4. Wählen Sie im oberen Menü +Hinzufügen aus. Wählen Sie dann " Rollenzuweisung hinzufügen" aus.

    Screenshot, der zeigt, wie Sie eine Speicherkontorolle zuweisen.

  5. Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Storage Blob Data Contributor. Wählen Sie das übereinstimmende Ergebnis aus, und wählen Sie dann Nextaus.

  6. Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.

  7. Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.

  8. Wählen Sie "Überprüfen" und "Zuweisen" aus, um zur endgültigen Seite zu wechseln. Wählen Sie „Überprüfen + zuweisen“ erneut aus, um den Vorgang abzuschließen.

Installieren der Pakete zum Empfangen von Ereignissen

Für die empfangende Seite muss mindestens ein weiteres Paket installiert werden. In dieser Schnellstartanleitung verwenden Sie Azure Blob Storage, um Prüfpunkte beizubehalten, damit das Programm die bereits gelesenen Ereignisse nicht liest. In einem Blob werden für empfangene Nachrichten in regelmäßigen Abständen Metadatenprüfpunkte erstellt. Dadurch kann der Empfang von Nachrichten später problemlos an der Stelle fortgesetzt werden, an der Sie aufgehört haben.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Erstellen eines Python-Skripts zum Empfangen von Ereignissen

In diesem Abschnitt erstellen Sie ein Python-Skript, um Ereignisse aus ihrem Event Hub zu empfangen:

  1. Öffnen Sie Ihren bevorzugten Python-Editor, z.B. Visual Studio Code.

  2. Erstellen Sie ein Skript mit dem Namen recv.py.

  3. Fügen Sie den folgenden Code in recv.py ein:

    Ersetzen Sie im Code die folgenden Platzhalter durch echte Werte:

    • BLOB_STORAGE_ACCOUNT_URL - Dieser Wert sollte im Format vorliegen: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME – Name des BLOB-Containers im Azure-Speicherkonto.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE – Auf der Seite "Übersicht" des Namespace wird der vollqualifizierte Name angezeigt. Er sollte im Format vorliegen: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME: Name des Event Hubs.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Hinweis

    Beispiele für weitere Optionen zum Empfangen von Ereignissen von einem Event Hub asynchron mithilfe einer Verbindungszeichenfolge finden Sie auf der GitHub-recv_with_checkpoint_store_async.py-Seite. Die dort gezeigten Muster lassen sich auch auf das kennwortlose Empfangen von Ereignissen anwenden.

Ausführen der Empfänger-App

  1. Starten Sie eine Eingabeaufforderung.

  2. Führen Sie den folgenden Befehl aus, und melden Sie sich mit dem Konto an, das der Rolle "Azure Event Hubs Data Owner " im Event Hubs-Namespace und der Rolle " Storage Blob Data Contributor " im Azure-Speicherkonto hinzugefügt wurde.

    az login
    
  3. Wechseln Sie zu dem Ordner mit der receive.py-Datei, und führen Sie den folgenden Befehl aus:

    python recv.py
    

Ausführen der Sender-App

  1. Starten Sie eine Eingabeaufforderung.

  2. Führen Sie den folgenden Befehl aus, und melden Sie sich mit dem Konto an, das der Rolle "Azure Event Hubs Data Owner " im Event Hubs-Namespace und der Rolle " Storage Blob Data Contributor " im Azure-Speicherkonto hinzugefügt wurde.

    az login
    
  3. Wechseln Sie zu dem Ordner mit dem send.py, und führen Sie dann den folgenden Befehl aus:

    python send.py
    

Die Nachrichten, die an den Event Hub gesendet wurden, sollten im Empfängerfenster angezeigt werden.

Problembehandlung

Wenn im Empfängerfenster keine Ereignisse angezeigt werden oder der Code einen Fehler meldet, probieren Sie die folgenden Tipps zur Problembehandlung aus:

  • Wenn keine Ergebnisse aus recy.py angezeigt werden, führen Sie send.py mehrmals aus.

  • Wenn bei Verwendung des kennwortlosen Codes (mit Anmeldeinformationen) Fehler zu „coroutine“ angezeigt werden, stellen Sie sicher, dass Sie den Import aus azure.identity.aio verwenden.

  • Wenn bei Verwendung des kennwortlosen Codes (mit Anmeldeinformationen) Fehler „Nicht geschlossene Clientsitzung“ angezeigt wird, stellen Sie sicher, dass Sie die Sitzung schließen, wenn Sie fertig sind. Weitere Informationen finden Sie unter Asynchrone Anmeldeinformationen.

  • Wenn beim Zugriff auf den Speicher Autorisierungsfehler bei recv.py angezeigt werden, stellen Sie sicher, dass Sie die Schritte unter Erstellen eines Azure-Speicherkontos und eines Blobcontainers ausgeführt und dem Dienstprinzipal die Rolle Mitwirkender an Storage-Blobdaten zugewiesen haben.

  • Wenn Sie Ereignisse mit unterschiedlichen Partitions-IDs erhalten, wird dieses Ergebnis erwartet. Partitionen sind ein Mechanismus zum Organisieren von Daten, der sich auf die erforderliche Downstreamparallelität in verarbeitenden Anwendungen bezieht. Die Anzahl der Partitionen in einem Event Hub steht in direktem Zusammenhang mit der erwarteten Anzahl von gleichzeitigen Lesern. Weitere Informationen finden Sie unter Weitere Informationen zu Partitionen.

Nächste Schritte

In dieser Schnellstartanleitung haben Sie Ereignisse asynchron gesendet und empfangen. Beispiele zum synchronen Senden und Empfangen von Ereignissen finden Sie auf der GitHub-Seite „sync_samples“.

Erkunden Sie weitere Beispiele und erweiterte Szenarien in der Azure Event Hubs-Clientbibliothek für Python-Beispiele.