Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In dieser Schnellstartanleitung erfahren Sie, wie Sie Ereignisse mithilfe des Azure-eventhub Python-Pakets an einen Event Hub senden und ereignisse von einem Event Hub empfangen.
Voraussetzungen
Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.
Um diese Schnellstartanleitung abzuschließen, stellen Sie sicher, dass Sie über die folgenden Voraussetzungen verfügen:
- Microsoft Azure-Abonnement: Registrieren Sie sich für eine kostenlose Testversion , wenn Sie kein Abonnement haben.
- Python 3.8 oder höher: Stellen Sie sicher, dass pip installiert und aktualisiert wird.
- Visual Studio Code (empfohlen): Oder verwenden Sie eine andere IDE Ihrer Wahl.
- Event Hubs-Namespace und Event Hub: Befolgen Sie dieses Handbuch , um sie im Azure-Portal zu erstellen.
Installieren der Pakete zum Senden von Ereignissen
Um die Python-Pakete für Event Hubs zu installieren, öffnen Sie eine Eingabeaufforderung mit Python im Pfad. Ändern Sie das Verzeichnis in den Ordner, in dem Sie Ihre Beispiele speichern möchten.
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
Authentifizieren der App bei Azure
In dieser Schnellstartanleitung werden zwei Möglichkeiten zum Herstellen einer Verbindung mit Azure Event Hubs gezeigt:
- Kennwortlos. Verwenden Sie Ihren Sicherheitsprinzipal in Microsoft Entra ID und die rollenbasierte Zugriffssteuerung (RBAC), um eine Verbindung zu einem Event Hubs Namespace herzustellen. Sie müssen sich keine Gedanken darüber machen, dass hartcodierte Verbindungszeichenfolgen in Ihrem Code, in einer Konfigurationsdatei oder im sicheren Speicher wie Azure Key Vault vorhanden sind.
- Verbindungszeichenfolge. Verwenden Sie eine Verbindungszeichenfolge, um eine Verbindung mit einem Event Hubs-Namespace herzustellen. Wenn Sie noch nicht mit Azure vertraut sind, ist die Option mit einer Verbindungszeichenfolge möglicherweise einfacher.
In realen Anwendungen und Produktionsumgebungen wird die kennwortlose Option empfohlen. Weitere Informationen finden Sie unter Service Bus-Authentifizierung und Autorisierung und kennwortloseVerbindungen für Azure-Dienste.
Zuweisen von Rollen zu Ihrem Microsoft Entra-Benutzer
Stellen Sie bei der lokalen Entwicklung sicher, dass das Benutzerkonto, das eine Verbindung mit Azure Event Hubs herstellt, über die richtigen Berechtigungen verfügt. Sie benötigen die Azure Event Hubs-Rolle "Datenbesitzer ", um Nachrichten zu senden und zu empfangen. Um sich selbst diese Rolle zuzuweisen, benötigen Sie die Rolle des Benutzerzugriffsadministrators oder eine andere Rolle, die die Microsoft.Authorization/roleAssignments/write
Aktion enthält. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen finden Sie auf der Seite Umfang von Azure RBAC verstehen.
Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Azure Event Hubs Data Owner
zugewiesen. Diese Rolle bietet Vollzugriff auf Azure Event Hubs-Ressourcen. Halten Sie sich in einem echten Szenario an das Prinzip der geringsten Rechte, um Benutzern nur die benötigten Mindestberechtigungen zu erteilen und so die Produktionsumgebung besser zu schützen.
In Azure integrierte Rollen für Azure Event Hubs
Bei Azure Event Hubs ist die Verwaltung der Namespaces und aller zugehörigen Ressourcen über das Azure-Portal und die Azure-Ressourcenverwaltungs-API bereits durch das Azure RBAC-Modell geschützt. Azure bietet die folgenden integrierten Rollen zum Autorisieren des Zugriffs auf einen Event Hubs-Namespace:
- Azure Event Hubs-Datenbesitzer: Ermöglicht den Datenzugriff auf den Event Hubs-Namespace und seine Entitäten (Warteschlangen, Themen, Abonnements und Filter).
- Azure Event Hubs-Datensender: Verwenden Sie diese Rolle, um dem Event Hubs-Namespace und seinen Entitäten Sendezugriff zu erteilen.
- Azure Event Hubs-Datenempfänger: Verwenden Sie diese Rolle, um dem Event Hubs-Namespace und seinen Entitäten Empfangszugriff zu erteilen.
Informationen zum Erstellen einer benutzerdefinierten Rolle finden Sie unter Erforderliche Rechte für Event Hubs-Vorgänge.
Wichtig
In der Regel dauert die Verteilung der Rollenzuweisung in Azure ein bis zwei Minuten. In seltenen Fällen kann es bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.
Navigieren Sie im Azure-Portal zu Ihrem Event Hubs-Namespace. Verwenden Sie dazu entweder die Hauptsuchleiste oder die linke Navigationsleiste.
Wählen Sie auf der Übersichtsseite im linken Menü die Option Zugriffssteuerung (IAM) aus.
Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.
Wählen Sie im oberen Menü +Hinzufügen aus. Wählen Sie dann " Rollenzuweisung hinzufügen" aus.
Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach
Azure Event Hubs Data Owner
, und wählen Sie das entsprechende Ergebnis aus. Klicken Sie dann auf Weiter.Wählen Sie unter Zugriff zuweisen für die Option Benutzer, Gruppe oder Dienstprinzipal aus. Wählen Sie dann +Mitglieder auswählen.
Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre user@domain E-Mail-Adresse). Wählen Sie unten im Dialogfeld "Auswählen" aus.
Wählen Sie "Überprüfen" und "Zuweisen" aus, um zur endgültigen Seite zu wechseln. Wählen Sie „Überprüfen + zuweisen“ erneut aus, um den Vorgang abzuschließen.
Senden von Ereignisse
Erstellen Sie in diesem Abschnitt ein Python-Skript, um Ereignisse an den Event Hub zu senden, den Sie zuvor erstellt haben.
Öffnen Sie Ihren bevorzugten Python-Editor, z.B. Visual Studio Code.
Erstellen Sie ein Skript mit dem Namen send.py. Dieses Skript sendet einen Batch von Ereignissen an den Event Hub, den Sie zuvor erstellt haben.
Fügen Sie den folgenden Code in send.py ein:
Ersetzen Sie im Code die folgenden Platzhalter durch echte Werte:
-
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
– Auf der Seite "Übersicht" des Namespace wird der vollqualifizierte Name angezeigt. Er sollte im Format vorliegen:<NAMESPACENAME>>.servicebus.windows.net
. -
EVENT_HUB_NAME
: Name des Event Hubs.
import asyncio from azure.eventhub import EventData from azure.eventhub.aio import EventHubProducerClient from azure.identity.aio import DefaultAzureCredential EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) print("Producer client created successfully.") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close() asyncio.run(run())
Hinweis
Beispiele für andere Optionen zum asynchronen Senden von Ereignissen an einen Event Hub mithilfe einer Verbindungszeichenfolge finden Sie auf der GitHub-send_async.py-Seite. Die dort gezeigten Muster lassen sich auch auf das kennwortlose Senden von Ereignissen anwenden.
-
Empfangen von Ereignissen
In dieser Schnellstartanleitung wird Azure Blob Storage als Prüfpunktspeicher verwendet. Der Prüfpunktspeicher wird verwendet, um Prüfpunkte (d. h. die letzte Leseposition) beizubehalten.
Befolgen Sie diese Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:
- Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
- Verwenden Sie das Speicherkonto nicht für andere Elemente.
- Verwenden Sie den Container nicht für andere Elemente.
- Erstellen Sie das Speicherkonto in derselben Region wie die bereitgestellte Anwendung. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.
Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.
- Hierarchischer Namespace
- Vorläufiges Löschen von Blobs
- Versionsverwaltung
Erstellen eines Azure-Speicherkontos und eines Blobcontainers
Führen Sie die folgenden Schritte aus, um ein Azure-Speicherkonto und einen darin befindlichen Blobcontainer zu erstellen:
- Erstellen Sie ein Azure Storage-Konto.
- Erstellen Sie einen Blobcontainer.
- Authentifizieren Sie sich beim Blobcontainer.
Notieren Sie sich die Verbindungszeichenfolge und den Containernamen zur späteren Verwendung im Empfangscode.
Stellen Sie bei der lokalen Entwicklung sicher, dass das Benutzerkonto, das auf BLOB-Daten zugreift, über die richtigen Berechtigungen verfügt. Sie benötigen die Berechtigung Mitwirkender an Speicherblobdaten zum Lesen und Schreiben von Blobdaten. Um sich selbst diese Rolle zuzuweisen, müssen Sie der Rolle "Benutzerzugriffsadministrator " oder einer anderen Rolle zugewiesen werden, die die Aktion "Microsoft.Authorization/roleAssignments/write " enthält. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen finden Sie in der Grundlegendes zum Bereich von Azure RBAC.
In diesem Szenario weisen Sie Ihrem Benutzerkonto Berechtigungen zu, die auf das Speicherkonto festgelegt sind, um dem Prinzip der geringsten Rechte zu folgen. Auf diese Weise erhalten Benutzer nur die erforderlichen Mindestberechtigungen, und es entstehen sicherere Produktionsumgebungen.
Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle " Storage Blob Data Contributor" zugewiesen, die sowohl Lese- als auch Schreibzugriff auf BLOB-Daten in Ihrem Speicherkonto bietet.
Wichtig
In der Regel dauert die Verteilung der Rollenzuweisung in Azure ein bis zwei Minuten. In seltenen Fällen kann es bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.
Suchen Sie im Azure-Portal Ihr Speicherkonto mithilfe der Hauptsuchleiste oder der linken Navigationsleiste.
Wählen Sie auf der Seite "Speicherkonto" im linken Menü die Zugriffssteuerung (IAM) aus.
Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.
Wählen Sie im oberen Menü +Hinzufügen aus. Wählen Sie dann " Rollenzuweisung hinzufügen" aus.
Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Storage Blob Data Contributor. Wählen Sie das übereinstimmende Ergebnis aus, und wählen Sie dann Nextaus.
Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.
Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.
Wählen Sie "Überprüfen" und "Zuweisen" aus, um zur endgültigen Seite zu wechseln. Wählen Sie „Überprüfen + zuweisen“ erneut aus, um den Vorgang abzuschließen.
Installieren der Pakete zum Empfangen von Ereignissen
Für die empfangende Seite muss mindestens ein weiteres Paket installiert werden. In dieser Schnellstartanleitung verwenden Sie Azure Blob Storage, um Prüfpunkte beizubehalten, damit das Programm die bereits gelesenen Ereignisse nicht liest. In einem Blob werden für empfangene Nachrichten in regelmäßigen Abständen Metadatenprüfpunkte erstellt. Dadurch kann der Empfang von Nachrichten später problemlos an der Stelle fortgesetzt werden, an der Sie aufgehört haben.
pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity
Erstellen eines Python-Skripts zum Empfangen von Ereignissen
In diesem Abschnitt erstellen Sie ein Python-Skript, um Ereignisse aus ihrem Event Hub zu empfangen:
Öffnen Sie Ihren bevorzugten Python-Editor, z.B. Visual Studio Code.
Erstellen Sie ein Skript mit dem Namen recv.py.
Fügen Sie den folgenden Code in recv.py ein:
Ersetzen Sie im Code die folgenden Platzhalter durch echte Werte:
-
BLOB_STORAGE_ACCOUNT_URL
- Dieser Wert sollte im Format vorliegen:https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
-
BLOB_CONTAINER_NAME
– Name des BLOB-Containers im Azure-Speicherkonto. -
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
– Auf der Seite "Übersicht" des Namespace wird der vollqualifizierte Name angezeigt. Er sollte im Format vorliegen:<NAMESPACENAME>>.servicebus.windows.net
. -
EVENT_HUB_NAME
: Name des Event Hubs.
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore, ) from azure.identity.aio import DefaultAzureCredential BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL" BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME" EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close() if __name__ == "__main__": # Run the main method. asyncio.run(main())
Hinweis
Beispiele für weitere Optionen zum Empfangen von Ereignissen von einem Event Hub asynchron mithilfe einer Verbindungszeichenfolge finden Sie auf der GitHub-recv_with_checkpoint_store_async.py-Seite. Die dort gezeigten Muster lassen sich auch auf das kennwortlose Empfangen von Ereignissen anwenden.
-
Ausführen der Empfänger-App
Starten Sie eine Eingabeaufforderung.
Führen Sie den folgenden Befehl aus, und melden Sie sich mit dem Konto an, das der Rolle "Azure Event Hubs Data Owner " im Event Hubs-Namespace und der Rolle " Storage Blob Data Contributor " im Azure-Speicherkonto hinzugefügt wurde.
az login
Wechseln Sie zu dem Ordner mit der receive.py-Datei, und führen Sie den folgenden Befehl aus:
python recv.py
Ausführen der Sender-App
Starten Sie eine Eingabeaufforderung.
Führen Sie den folgenden Befehl aus, und melden Sie sich mit dem Konto an, das der Rolle "Azure Event Hubs Data Owner " im Event Hubs-Namespace und der Rolle " Storage Blob Data Contributor " im Azure-Speicherkonto hinzugefügt wurde.
az login
Wechseln Sie zu dem Ordner mit dem send.py, und führen Sie dann den folgenden Befehl aus:
python send.py
Die Nachrichten, die an den Event Hub gesendet wurden, sollten im Empfängerfenster angezeigt werden.
Problembehandlung
Wenn im Empfängerfenster keine Ereignisse angezeigt werden oder der Code einen Fehler meldet, probieren Sie die folgenden Tipps zur Problembehandlung aus:
Wenn keine Ergebnisse aus recy.py angezeigt werden, führen Sie send.py mehrmals aus.
Wenn bei Verwendung des kennwortlosen Codes (mit Anmeldeinformationen) Fehler zu „coroutine“ angezeigt werden, stellen Sie sicher, dass Sie den Import aus
azure.identity.aio
verwenden.Wenn bei Verwendung des kennwortlosen Codes (mit Anmeldeinformationen) Fehler „Nicht geschlossene Clientsitzung“ angezeigt wird, stellen Sie sicher, dass Sie die Sitzung schließen, wenn Sie fertig sind. Weitere Informationen finden Sie unter Asynchrone Anmeldeinformationen.
Wenn beim Zugriff auf den Speicher Autorisierungsfehler bei recv.py angezeigt werden, stellen Sie sicher, dass Sie die Schritte unter Erstellen eines Azure-Speicherkontos und eines Blobcontainers ausgeführt und dem Dienstprinzipal die Rolle Mitwirkender an Storage-Blobdaten zugewiesen haben.
Wenn Sie Ereignisse mit unterschiedlichen Partitions-IDs erhalten, wird dieses Ergebnis erwartet. Partitionen sind ein Mechanismus zum Organisieren von Daten, der sich auf die erforderliche Downstreamparallelität in verarbeitenden Anwendungen bezieht. Die Anzahl der Partitionen in einem Event Hub steht in direktem Zusammenhang mit der erwarteten Anzahl von gleichzeitigen Lesern. Weitere Informationen finden Sie unter Weitere Informationen zu Partitionen.
Nächste Schritte
In dieser Schnellstartanleitung haben Sie Ereignisse asynchron gesendet und empfangen. Beispiele zum synchronen Senden und Empfangen von Ereignissen finden Sie auf der GitHub-Seite „sync_samples“.
Erkunden Sie weitere Beispiele und erweiterte Szenarien in der Azure Event Hubs-Clientbibliothek für Python-Beispiele.