Share via


Azure Event Hubs Clientbibliothek für Python – Version 5.11.5

Azure Event Hubs ist ein hochgradig skalierbarer Publish-Subscribe-Dienst, der Millionen von Ereignissen pro Sekunde erfassen und an mehrere Verbraucher streamen kann. Auf diese Weise können Sie die enormen Datenmengen verarbeiten und analysieren, die von Ihren verbundenen Geräten und Anwendungen erzeugt werden. Sobald Event Hubs die Daten gesammelt hat, können Sie sie mithilfe eines beliebigen Echtzeitanalyseanbieters oder mit Batching-/Speicheradaptern abrufen, transformieren und speichern. Wenn Sie mehr über Azure Event Hubs erfahren möchten, sollten Sie folgendes lesen: Was ist Event Hubs?

Die Azure Event Hubs-Clientbibliothek ermöglicht die Veröffentlichung und Nutzung von Azure Event Hubs-Ereignissen und kann für Folgendes verwendet werden:

  • Geben Sie Telemetriedaten zu Ihrer Anwendung für Business Intelligence- und Diagnose-Zwecke aus.
  • Veröffentlichen Sie Fakten zum Status Ihrer Anwendung, die von interessierten Parteien beobachtet und als Auslöser für die Ausführung von Aktionen verwendet werden können.
  • Beobachten Sie interessante Vorgänge und Interaktionen innerhalb Ihres Unternehmens oder anderen Ökosystems, sodass lose gekoppelte Systeme interagieren können, ohne sie miteinander verbinden zu müssen.
  • Empfangen Sie Ereignisse von einem oder mehreren Herausgebern, transformieren Sie sie, um die Anforderungen Ihres Ökosystems besser zu erfüllen, und veröffentlichen Sie die transformierten Ereignisse dann in einem neuen Stream, den Consumer beobachten können.

Quellcode | Paket (PyPi) | Paket (Conda) | API-Referenzdokumentation | Produktdokumentation | Proben

Erste Schritte

Voraussetzungen

  • Python 3.7 oder höher.

  • Microsoft Azure-Abonnement: Für die Verwendung von Azure-Diensten, einschließlich Azure Event Hubs, benötigen Sie ein Abonnement. Wenn Sie nicht über ein vorhandenes Azure-Konto verfügen, können Sie sich für eine kostenlose Testversion registrieren oder ihre MSDN-Abonnentenvorteile beim Erstellen eines Kontos nutzen.

  • Event Hubs-Namespace mit einem Event Hub: Um mit Azure Event Hubs zu interagieren, müssen Sie auch einen Namespace und Event Hub zur Verfügung haben. Wenn Sie mit dem Erstellen von Azure-Ressourcen nicht vertraut sind, sollten Sie die schrittweise Anleitung zum Erstellen eines Event Hubs mit dem Azure-Portal befolgen. Dort finden Sie auch ausführliche Anweisungen zum Verwenden von Azure CLI-, Azure PowerShell- oder Azure Resource Manager-Vorlagen (ARM) zum Erstellen eines Event Hubs.

Installieren des Pakets

Installieren Sie die Azure Event Hubs-Clientbibliothek für Python mit pip:

$ pip install azure-eventhub

Authentifizieren des Clients

Die Interaktion mit Event Hubs beginnt mit einer instance der EventHubConsumerClient- oder EventHubProducerClient-Klasse. Sie benötigen entweder den Hostnamen, SAS/AAD-Anmeldeinformationen und Event Hub-Namen oder einen Verbindungszeichenfolge, um das Clientobjekt zu instanziieren.

Erstellen Sie einen Client aus Verbindungszeichenfolge:

Damit die Event Hubs-Clientbibliothek mit einem Event Hub interagiert, besteht die einfachste Möglichkeit darin, eine Verbindungszeichenfolge zu verwenden, die beim Erstellen eines Event Hubs-Namespaces automatisch erstellt wird. Wenn Sie mit Shared Access-Richtlinien in Azure nicht vertraut sind, sollten Sie die schrittweise Anleitung befolgen, um eine Event Hubs-Verbindungszeichenfolge zu erhalten.

  • Die from_connection_string -Methode übernimmt die Verbindungszeichenfolge des Formular- Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> und Entitätsnamens an Ihre Event Hub-instance. Die Verbindungszeichenfolge kann über das Azure-Portal abgerufen werden.

Erstellen Sie einen Client mithilfe der Azure-Identity-Bibliothek:

Alternativ können Sie ein Credential-Objekt verwenden, um sich über AAD mit dem Paket azure-identity zu authentifizieren.

  • Dieser im oben verknüpften Beispiel veranschaulichte Konstruktor übernimmt den Hostnamen und Entitätsnamen Ihrer Event Hub-instance und Anmeldeinformationen, die das TokenCredential-Protokoll implementiert. Im Paket azure-identity sind Implementierungen des TokenCredential Protokolls verfügbar. Der Hostname hat das Format <yournamespace.servicebus.windows.net>.
  • Um die von azure-identitybereitgestellten Anmeldeinformationstypen zu verwenden, installieren Sie das Paket: pip install azure-identity
  • Um die asynchrone API verwenden zu können, müssen Sie außerdem zuerst einen asynchronen Transport installieren, z. B aiohttp. : pip install aiohttp
  • Bei Verwendung von Azure Active Directory muss Ihrem Prinzipal eine Rolle zugewiesen werden, die den Zugriff auf Event Hubs ermöglicht, z. B. die Rolle Azure Event Hubs Datenbesitzer. Weitere Informationen zur Verwendung der Azure Active Directory-Autorisierung mit Event Hubs finden Sie in der zugehörigen Dokumentation.

Wichtige Begriffe

  • Ein EventHubProducerClient ist eine Quelle von Telemetriedaten, Diagnose Informationen, Nutzungsprotokollen oder anderen Protokolldaten als Teil einer eingebetteten Gerätelösung, einer mobilen Geräteanwendung, eines Spieltitels, der auf einer Konsole oder einem anderen Gerät, einer client- oder serverbasierten Geschäftslösung oder einer Website ausgeführt wird.

  • Ein EventHubConsumerClient erfasst solche Informationen aus dem Event Hub und verarbeitet sie. Die Verarbeitung kann Aggregation, komplexe Berechnung und Filterung umfassen. Die Verarbeitung kann auch eine unformatierte oder transformierte Verteilung oder Speicherung der Informationen umfassen. Event Hub-Consumer sind häufig robuste und umfangreiche Komponenten der Plattforminfrastruktur mit integrierten Analysefunktionen wie Azure Stream Analytics, Apache Spark oder Apache Storm.

  • Eine Partition ist eine geordnete Sequenz Abfolge von Ereignissen, die in einem Event Hub erfolgt. Azure Event Hubs ermöglicht Nachrichtenstreaming über ein partitioniertes Consumermuster, bei dem jeder Consumer nur eine bestimmte Teilmenge oder Partition des Nachrichtendatenstroms liest. Neu eingehende Ereignisse werden am Ende dieser Sequenz hinzugefügt. Die Anzahl der Partitionen wird zum Zeitpunkt der Erstellung eines Event Hubs angegeben und kann nicht geändert werden.

  • Eine Consumergruppe ist eine Ansicht eines vollständigen Event Hubs. Consumergruppen ermöglichen es mehreren verarbeitenden Anwendungen, jeweils eine eigene Ansicht des Ereignisdatenstroms zu verwenden und den Datenstrom unabhängig voneinander in ihrem eigenen Tempo und von ihrer eigenen Position aus zu lesen. Pro Consumergruppe können höchstens fünf gleichzeitige Leser auf einer Partition vorhanden sein. Es wird jedoch empfohlen, nur einen aktiven Consumer für eine bestimmte Partitions- und Consumergruppenkopplung zu verwenden. Jeder aktive Leser empfängt alle Ereignisse aus seiner Partition. Wenn sich mehrere Reader auf derselben Partition befinden, erhalten sie doppelte Ereignisse.

Weitere Konzepte und ausführlichere Diskussionen finden Sie unter Event Hubs-Features. Außerdem sind die Konzepte für AMQP in OASIS Advanced Messaging Queuing Protocol (AMQP) Version 1.0 gut dokumentiert.

Threadsicherheit

Wir garantieren nicht, dass eventHubProducerClient oder EventHubConsumerClient threadsicher sind. Es wird nicht empfohlen, diese Instanzen threadsübergreifend wiederzuverwenden. Es liegt an der ausgeführten Anwendung, diese Klassen threadsicher zu verwenden.

Der Datentyp EventDataBatch ist nicht threadsicher. Es sollte nicht threadsübergreifend freigegeben und nicht gleichzeitig mit Clientmethoden verwendet werden.

Beispiele

Die folgenden Abschnitte enthalten mehrere Codeausschnitte, die einige der häufigsten Event Hubs-Aufgaben behandeln, einschließlich:

Untersuchen eines Event Hubs

Rufen Sie die Partitions-ID eines Event Hubs ab.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Veröffentlichen von Ereignissen für einen Event Hub

Verwenden Sie die create_batch -Methode für EventHubProducerClient , um ein EventDataBatch Objekt zu erstellen, das dann mit der send_batch -Methode gesendet werden kann. Ereignisse können mithilfe der EventDataBatchadd -Methode hinzugefügt werden, bis die maximale Batchgröße in Bytes erreicht wurde.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Nutzen von Ereignissen aus einem Event Hub

Es gibt mehrere Möglichkeiten, Ereignisse von einem EventHub zu nutzen. Um einfach einen Rückruf auszulösen, wenn ein Ereignis empfangen wird, ist die EventHubConsumerClient.receive Methode wie folgt von Nutzen:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Nutzen von Ereignissen aus einem Event Hub in Batches

Während im obigen Beispiel der Rückruf für jede Nachricht ausgelöst wird, während sie empfangen wird, löst das folgende Beispiel den Rückruf für einen Batch von Ereignissen aus, wobei versucht wird, eine Nummer gleichzeitig zu erhalten.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

asynchrones Veröffentlichen von Ereignissen in einem Event Hub

Verwenden Sie die create_batch -Methode für EventHubProducer , um ein EventDataBatch Objekt zu erstellen, das dann mit der send_batch -Methode gesendet werden kann. Ereignisse können mithilfe der EventDataBatchadd -Methode hinzugefügt werden, bis die maximale Batchgröße in Bytes erreicht wurde.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

asynchrone Nutzung von Ereignissen aus einem Event Hub

Dieses SDK unterstützt synchronen und asynchronen Code. Um wie in den obigen Beispielen gezeigt zu empfangen, aber innerhalb von aio ist Folgendes erforderlich:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

asynchrone Nutzung von Ereignissen aus einem Event Hub in Batches

Alle synchronen Funktionen werden auch in aio unterstützt. Wie oben für den synchronen Batchempfang gezeigt, kann dasselbe in asyncio wie folgt ausgeführt werden:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Nutzen von Ereignissen und Speichern von Prüfpunkten mithilfe eines Prüfpunktspeichers

EventHubConsumerClient ist ein allgemeines Konstrukt, mit dem Sie Ereignisse von mehreren Partitionen gleichzeitig empfangen und einen Lastenausgleich mit anderen Consumern durchführen können, die dieselbe Event Hub- und Consumergruppe verwenden.

Dadurch kann der Benutzer auch den Fortschritt nachverfolgen, wenn Ereignisse mithilfe von Prüfpunkten verarbeitet werden.

Ein Prüfpunkt soll das letzte erfolgreich verarbeitete Ereignis des Benutzers aus einer bestimmten Partition einer Consumergruppe in einem Event Hub-instance darstellen. Die EventHubConsumerClient verwendet eine instance vonCheckpointStore, um Prüfpunkte zu aktualisieren und die relevanten Informationen zu speichern, die für den Lastenausgleichsalgorithmus erforderlich sind.

Suchen Sie pypi mit dem Präfix azure-eventhub-checkpointstore , um Pakete zu finden, die dies unterstützen, und verwenden Sie die CheckpointStore Implementierung aus einem solchen Paket. Beachten Sie, dass sowohl Synchronisierungs- als auch asynchrone Bibliotheken bereitgestellt werden.

Im folgenden Beispiel erstellen wir eine instance von EventHubConsumerClient und verwenden einen BlobCheckpointStore. Sie müssen ein Azure Storage-Konto und einen Blobcontainer erstellen, um den Code auszuführen.

Azure Blob Storage Checkpoint Store Async und Azure Blob Storage Checkpoint Store Sync sind eine der Implementierungen, die CheckpointStore wir bereitstellen, die Azure Blob Storage als persistenter Speicher anwendet.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Verwenden von EventHubConsumerClient zum Arbeiten mit IoT Hub

Sie können auch verwendenEventHubConsumerClient, um mit IoT Hub zu arbeiten. Dies ist nützlich, um Telemetriedaten von IoT Hub vom verknüpften EventHub zu empfangen. Die zugeordnete Verbindungszeichenfolge hat keine Sendeansprüche, daher ist das Senden von Ereignissen nicht möglich.

Beachten Sie, dass die Verbindungszeichenfolge für einen Event Hub-kompatiblen Endpunkt sein muss, z. B. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Es gibt zwei Möglichkeiten, den Event Hubs-kompatiblen Endpunkt abzurufen:

  • Rufen Sie die integrierten Endpunkte der IoT Hub im Azure-Portal manuell ab, und empfangen Sie sie.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Problembehandlung

Ausführliche Informationen zur Diagnose verschiedener Fehlerszenarien finden Sie im azure-eventhubsLeitfaden zur Problembehandlung .

Nächste Schritte

Weiterer Beispielcode

Im Beispielverzeichnis finden Sie ausführliche Beispiele für die Verwendung dieser Bibliothek zum Senden und Empfangen von Ereignissen an/von Event Hubs.

Dokumentation

Die Referenzdokumentation finden Sie hier.

Schemaregistrierung und Avro-Encoder

Das EventHubs SDK lässt sich gut in den Schemaregistrierungsdienst und Avro integrieren. Weitere Informationen finden Sie unter Schema Registry SDK und Schema Registry Avro Encoder SDK.

Reine Python-AMQP-Transport- und Abwärtskompatibilitätsunterstützung

Die Azure Event Hubs-Clientbibliothek basiert jetzt auf einer reinen Python-AMQP-Implementierung. uAMQP wurde als erforderliche Abhängigkeit entfernt.

So verwenden Sie uAMQP als zugrunde liegenden Transport:

  1. Installieren Sie uamqp mit pip.
$ pip install uamqp 
  1. Passieren Sie uamqp_transport=True während des Baus des Kunden.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Hinweis: Das message Attribut auf EventData/EventDataBatch, das zuvor verfügbar gemacht uamqp.Messagehat, ist veraltet. Die von zurückgegebenen EventData.message/EventDataBatch.message "Legacy"-Objekte wurden eingeführt, um den Übergang zu erleichtern.

Erstellen des uAMQP-Rads aus der Quelle

Wenn uAMQP als zugrunde liegende AMQP-Protokollimplementierung für azure-eventhubverwendet werden soll, können uAMQP-Räder für die meisten wichtigen Betriebssysteme gefunden werden.

Wenn Sie die Verwendung uAMQP beabsichtigen und sie auf einer Plattform ausführen, für die uAMQP-Räder nicht bereitgestellt werden, befolgen Sie die uAMQP-Installationsanleitung , um sie von der Quelle aus zu installieren.

Feedback geben

Wenn Fehler auftreten oder Vorschläge vorliegen, melden Sie ein Problem im Abschnitt Probleme des Projekts.

Mitwirken

Beiträge und Vorschläge für dieses Projekt sind willkommen. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. Ausführliche Informationen finden Sie unter https://cla.microsoft.com.

Wenn Sie einen Pull Request (PR) übermitteln, überprüft ein CLA-Bot automatisch, ob Sie eine Lizenzvereinbarung bereitstellen und den PR entsprechend ergänzen müssen (z.B. mit einer Bezeichnung oder einem Kommentar). Führen Sie einfach die Anweisungen des Bots aus. Sie müssen dies nur einmal für alle Repositorys ausführen, die unsere CLA verwenden.

Für dieses Projekt gelten die Microsoft-Verhaltensregeln für Open Source (Microsoft Open Source Code of Conduct). Weitere Informationen finden Sie in den häufig gestellten Fragen zum Verhaltenskodex. Sie können sich auch an opencode@microsoft.com wenden, wenn Sie weitere Fragen oder Anmerkungen haben.

Aufrufe