Delen via


Azure Event Hubs clientbibliotheek voor Python - versie 5.11.5

Azure Event Hubs is een uiterst schaalbare service voor publiceren/abonneren waarmee miljoenen gebeurtenissen per seconde kunnen worden opgenomen en naar meerdere consumenten kunnen worden gestreamd. Hiermee kunt u de enorme hoeveelheden gegevens die door uw verbonden apparaten en toepassingen worden geproduceerd, verwerken en analyseren. Zodra Event Hubs de gegevens heeft verzameld, kunt u deze ophalen, transformeren en opslaan met behulp van een realtime analyseprovider of met batchverwerkings-/opslagadapters. Als u meer wilt weten over Azure Event Hubs, kunt u het volgende lezen: Wat is Event Hubs?

Met de Azure Event Hubs-clientbibliotheek kunt u gebeurtenissen van Azure Event Hubs publiceren en gebruiken. Deze kunnen worden gebruikt om:

  • Telemetrie over uw toepassing voor business intelligence en diagnostische doeleinden te verzenden;
  • Feiten te publiceren over de status van uw toepassing die geïnteresseerde partijen kunnen observeren en gebruiken als trigger om actie te ondernemen;
  • Interessante bewerkingen en interacties binnen uw bedrijf of een ander ecosysteem te observeren, zodat losjes gekoppelde systemen met elkaar kunnen communiceren zonder dat ze aan elkaar hoeven te worden gekoppeld;
  • Gebeurtenissen ontvangen van een of meer uitgevers, deze transformeren om beter te voldoen aan de behoeften van uw ecosysteem en de getransformeerde gebeurtenissen vervolgens publiceren naar een nieuwe stroom die gebruikers kunnen observeren.

Broncode | Pakket (PyPi) | Pakket (Conda) | API-referentiedocumentatie | Productdocumentatie | Monsters

Aan de slag

Vereisten

  • Python 3.7 of hoger.

  • Microsoft Azure-abonnement: Als u Azure-services wilt gebruiken, waaronder Azure Event Hubs, hebt u een abonnement nodig. Als u geen bestaand Azure-account hebt, kunt u zich registreren voor een gratis proefversie of de voordelen van uw MSDN-abonnee gebruiken wanneer u een account maakt.

  • Event Hubs-naamruimte met een Event Hub: Als u wilt communiceren met Azure Event Hubs, moet u ook een naamruimte en Event Hub beschikbaar hebben. Als u niet bekend bent met het maken van Azure-resources, kunt u de stapsgewijze handleiding volgen voor het maken van een Event Hub met behulp van de Azure Portal. Daar vindt u ook gedetailleerde instructies voor het gebruik van de Azure CLI-, Azure PowerShell- of Arm-sjablonen (Azure Resource Manager) om een Event Hub te maken.

Het pakket installeren

Installeer de Azure Event Hubs-clientbibliotheek voor Python met pip:

$ pip install azure-eventhub

De client verifiëren

Interactie met Event Hubs begint met een exemplaar van de klasse EventHubConsumerClient of EventHubProducerClient. U hebt de hostnaam, SAS/AAD-referentie en event hubnaam of een verbindingsreeks nodig om het clientobject te instantiëren.

Client maken op basis van verbindingsreeks:

De eenvoudigste manier om de Event Hubs-clientbibliotheek te laten communiceren met een Event Hub, is het gebruik van een verbindingsreeks, die automatisch wordt gemaakt bij het maken van een Event Hubs-naamruimte. Als u niet bekend bent met beleid voor gedeelde toegang in Azure, kunt u de stapsgewijze handleiding volgen om een Event Hubs-verbindingsreeks te krijgen.

  • De from_connection_string methode neemt de verbindingsreeks van het formulier Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> en de naam van de entiteit naar uw Event Hub-exemplaar. U kunt de verbindingsreeks ophalen uit de Azure Portal.

Maak een client met behulp van de azure-identity-bibliotheek:

U kunt ook een credential-object gebruiken om via AAD te verifiëren met het pakket azure-identity.

  • Deze constructor die in het bovenstaande voorbeeld is gedemonstreerd, gebruikt de hostnaam en entiteitsnaam van uw Event Hub-exemplaar en de referentie waarmee het TokenCredential-protocol wordt geïmplementeerd. Er zijn implementaties van het TokenCredential protocol beschikbaar in het azure-identity-pakket. De hostnaam heeft de indeling <yournamespace.servicebus.windows.net>.
  • Als u de referentietypen wilt gebruiken die worden geleverd door azure-identity, installeert u het pakket: pip install azure-identity
  • Als u de asynchrone API wilt gebruiken, moet u bovendien eerst een asynchroon transport installeren, zoals aiohttp: pip install aiohttp
  • Wanneer u Azure Active Directory gebruikt, moet aan uw principal een rol worden toegewezen die toegang tot Event Hubs toestaat, zoals de rol Azure Event Hubs Gegevenseigenaar. Raadpleeg de bijbehorende documentatie voor meer informatie over het gebruik van Azure Active Directory-autorisatie met Event Hubs.

Belangrijkste concepten

  • Een EventHubProducerClient is een bron van telemetriegegevens, diagnostische gegevens, gebruikslogboeken of andere logboekgegevens, als onderdeel van een ingesloten apparaatoplossing, een toepassing voor mobiele apparaten, een gametitel die wordt uitgevoerd op een console of ander apparaat, een zakelijke oplossing op basis van een client of server of een website.

  • Een EventHubConsumerClient haalt dergelijke informatie op uit de Event Hub en verwerkt deze. Verwerking kan bestaan uit aggregatie, complexe berekeningen en filteren. Verwerking kan ook betrekking hebben op distributie of opslag van de informatie op onbewerkte of getransformeerde wijze. Event Hub-gebruikers zijn vaak robuuste en grootschalige platforminfrastructuuronderdelen met ingebouwde analysemogelijkheden, zoals Azure Stream Analytics, Apache Spark of Apache Storm.

  • Een partitie is een geordende reeks gebeurtenissen die wordt gehouden in een Event Hub. Azure Event Hubs biedt berichtstreaming via een gepartitioneerd consumentenpatroon waarbij elke consument alleen een specifieke subset of partitie van de berichtenstroom leest. Als er nieuwere gebeurtenissen plaatsvinden, worden deze toegevoegd aan het einde van deze reeks. Het aantal partities wordt opgegeven op het moment dat een Event Hub wordt gemaakt en kan niet worden gewijzigd.

  • Een consumentengroep is een weergave van een hele Event Hub. Met consumentengroepen kunnen meerdere verbruikende toepassingen elk een afzonderlijke weergave van de gebeurtenisstroom hebben en de stream onafhankelijk in hun eigen tempo en vanuit hun eigen positie lezen. Er kunnen maximaal 5 gelijktijdige lezers op een partitie per consumentengroep zijn; Het wordt echter aanbevolen dat er slechts één actieve consument is voor een bepaalde koppeling tussen partities en consumentengroepen. Elke actieve lezer ontvangt alle gebeurtenissen van zijn partitie; Als er meerdere lezers op dezelfde partitie zijn, ontvangen ze dubbele gebeurtenissen.

Zie Functies van Event Hubs voor meer concepten en diepere discussies. De concepten voor AMQP zijn ook goed gedocumenteerd in OASIS Advanced Messaging Queuing Protocol (AMQP) versie 1.0.

Veiligheid van schroefdraad

We kunnen niet garanderen dat de EventHubProducerClient of EventHubConsumerClient thread-veilig zijn. Het wordt afgeraden om deze exemplaren opnieuw te gebruiken in verschillende threads. Het is aan de actieve toepassing om deze klassen op een thread-veilige manier te gebruiken.

Het gegevensmodeltype EventDataBatch is niet thread-veilig. Het mag niet worden gedeeld tussen threads en mag niet gelijktijdig worden gebruikt met clientmethoden.

Voorbeelden

De volgende secties bevatten verschillende codefragmenten die betrekking hebben op enkele van de meest voorkomende Event Hubs-taken, waaronder:

Een Event Hub inspecteren

Haal de partitie-id's van een Event Hub op.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Gebeurtenissen publiceren naar een Event Hub

Gebruik de create_batch methode aan EventHubProducerClient om een EventDataBatch -object te maken dat vervolgens kan worden verzonden met behulp van de send_batch -methode. Gebeurtenissen kunnen worden toegevoegd aan de EventDataBatch met behulp van de add methode totdat de maximale batchgroottelimiet in bytes is bereikt.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Gebeurtenissen van een Event Hub gebruiken

Er zijn meerdere manieren om gebeurtenissen van een EventHub te gebruiken. Als u eenvoudig een callback wilt activeren wanneer een gebeurtenis wordt ontvangen, wordt de EventHubConsumerClient.receive methode als volgt gebruikt:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Gebeurtenissen van een Event Hub in batches gebruiken

Terwijl het bovenstaande voorbeeld de callback activeert voor elk bericht wanneer het wordt ontvangen, activeert het volgende voorbeeld de callback voor een batch gebeurtenissen, waarbij wordt geprobeerd een nummer tegelijk te ontvangen.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Gebeurtenissen asynchroon publiceren naar een Event Hub

Gebruik de create_batch methode aan EventHubProducer om een EventDataBatch -object te maken dat vervolgens kan worden verzonden met behulp van de send_batch -methode. Gebeurtenissen kunnen worden toegevoegd aan de EventDataBatch met behulp van de add methode totdat de maximale batchgroottelimiet in bytes is bereikt.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Gebeurtenissen van een Event Hub asynchroon gebruiken

Deze SDK ondersteunt zowel synchrone als asynchrone code. Om te ontvangen zoals in de bovenstaande voorbeelden is gedemonstreerd, maar binnen aio, hebt u het volgende nodig:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Gebeurtenissen van een Event Hub asynchroon in batches gebruiken

Alle synchrone functies worden ook ondersteund in aio. Zoals hierboven is gedemonstreerd voor synchrone batch-ontvangst, kan hetzelfde binnen asynchroon als volgt worden bereikt:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Gebeurtenissen gebruiken en controlepunten opslaan met behulp van een controlepuntarchief

EventHubConsumerClient is een constructie op hoog niveau waarmee u gebeurtenissen van meerdere partities tegelijk kunt ontvangen en taken kunt verdelen met andere consumenten die dezelfde Event Hub en consumentengroep gebruiken.

Hierdoor kan de gebruiker ook de voortgang bijhouden wanneer gebeurtenissen worden verwerkt met behulp van controlepunten.

Een controlepunt is bedoeld om de laatst verwerkte gebeurtenis door de gebruiker van een bepaalde partitie van een consumentengroep in een Event Hub-exemplaar weer te geven. De EventHubConsumerClient gebruikt een exemplaar van CheckpointStore om controlepunten bij te werken en om de relevante informatie op te slaan die is vereist voor het taakverdelingsalgoritmen.

Zoek pypi met het voorvoegsel azure-eventhub-checkpointstore om pakketten te vinden die dit ondersteunen en gebruik te maken van de CheckpointStore implementatie van een dergelijk pakket. Houd er rekening mee dat zowel synchronisatie- als asynchrone bibliotheken beschikbaar zijn.

In het onderstaande voorbeeld maken we een exemplaar van EventHubConsumerClient en gebruiken we een BlobCheckpointStore. U moet een Azure Storage-account en een blobcontainer maken om de code uit te voeren.

Azure Blob Storage Checkpoint Store Async en Azure Blob Storage Checkpoint Store Sync zijn een van de CheckpointStore implementaties die van toepassing zijn Azure Blob Storage als de permanente opslag.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

EventHubConsumerClient gebruiken om te werken met IoT Hub

U kunt ook gebruiken EventHubConsumerClient om met IoT Hub te werken. Dit is handig voor het ontvangen van telemetriegegevens van IoT Hub van de gekoppelde EventHub. De gekoppelde verbindingsreeks heeft geen verzendclaims, waardoor het verzenden van gebeurtenissen niet mogelijk is.

Houd er rekening mee dat de verbindingsreeks moet zijn voor een Event Hub-compatibel eindpunt, bijvoorbeeld 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Er zijn twee manieren om het eindpunt dat compatibel is met Event Hubs te verkrijgen:

  • Haal handmatig de 'ingebouwde eindpunten' van de IoT Hub op in Azure Portal en ontvang hiervan.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Problemen oplossen

Zie de azure-eventhubsgids voor probleemoplossing voor meer informatie over het diagnosticeren van verschillende foutscenario's.

Volgende stappen

Meer voorbeeldcode

Bekijk de map met voorbeelden voor gedetailleerde voorbeelden van het gebruik van deze bibliotheek voor het verzenden en ontvangen van gebeurtenissen naar/van Event Hubs.

Documentatie

Referentiedocumentatie is hier beschikbaar.

Schemaregister en Avro-encoder

De EventHubs SDK kan goed worden geïntegreerd met de Schema Registry-service en Avro. Raadpleeg Schema Registry SDK en Schema Registry Avro Encoder SDK voor meer informatie.

Pure Python AMQP-transport- en achterwaartse compatibiliteitsondersteuning

De Azure Event Hubs-clientbibliotheek is nu gebaseerd op een pure Python AMQP-implementatie. uAMQP is verwijderd als vereiste afhankelijkheid.

Als u wilt gebruiken uAMQP als het onderliggende transport:

  1. Installeren uamqp met pip.
$ pip install uamqp 
  1. Pass uamqp_transport=True tijdens bouw van de client.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Opmerking: het message kenmerk op EventData/EventDataBatch, waarmee eerder de uamqp.Messagewerd weergegeven, is afgeschaft. De verouderde objecten die door EventData.message/EventDataBatch.message zijn geretourneerd, zijn geïntroduceerd om de overgang te vergemakkelijken.

UAMQP-wiel bouwen vanuit bron

Als uAMQP is bedoeld om te worden gebruikt als de onderliggende IMPLEMENTATIE van het AMQP-protocol voor azure-eventhub, kunt uAMQP-wielen vinden voor de meeste grote besturingssystemen.

Als u van plan bent te gebruiken uAMQP en u werkt op een platform waarvoor geen uAMQP-wielen zijn opgegeven, volgt u de installatierichtlijnen voor uAMQP om te installeren vanaf de bron.

Feedback geven

Als u fouten tegenkomt of suggesties hebt, kunt u een probleem melden in de sectie Problemen van het project.

Bijdragen

Wij verwelkomen bijdragen en suggesties voor dit project. Voor de meeste bijdragen moet u instemmen met een licentieovereenkomst voor bijdragers (CLA: Contributor License Agreement) waarin u verklaart dat u gerechtigd bent ons het recht te geven uw bijdrage te gebruiken, en dat u dit ook doet. Ga naar https://cla.microsoft.com voor meer informatie.

Wanneer u een pull-aanvraag indient, wordt met een CLA-bot automatisch bepaald of u een CLA moet verschaffen en wordt de pull-aanvraag dienovereenkomstig opgemaakt (bijvoorbeeld met een label of commentaar). Volg gewoon de instructies van de bot. U hoeft dit maar eenmaal te doen voor alle repo's waar gebruik wordt gemaakt van onze CLA.

Op dit project is de Microsoft Open Source Code of Conduct (Microsoft Open Source-gedragscode) van toepassing. Zie de Veelgestelde vragen over de gedragscode voor meer informatie of neem contact op opencode@microsoft.com met eventuele aanvullende vragen of opmerkingen.

Weergaven