Share via


Azure Event Hubs klientbibliotek för Python – version 5.11.5

Azure Event Hubs är en mycket skalbar publicera-prenumerera-tjänst som kan mata in miljontals händelser per sekund och strömma dem till flera konsumenter. På så sätt kan du bearbeta och analysera de enorma mängder data som produceras av dina anslutna enheter och program. När Event Hubs har samlat in data kan du hämta, transformera och lagra dem med hjälp av valfri realtidsanalysprovider eller med batchbearbetning/lagringskort. Om du vill veta mer om Azure Event Hubs kanske du vill granska: Vad är Event Hubs?

Klientbiblioteket i Azure Event Hubs används till att publicera och förbruka Azure Event Hubs-händelser och kan användas till att:

  • Skicka telemetri om ditt program för Business Intelligence och diagnostiska ändamål.
  • Publicera fakta om tillståndet för ditt program som berörda parter kan observera och använda som utlösare för att vidta åtgärder.
  • Observera intressanta åtgärder och interaktioner som sker i din verksamhet eller andra ekosystem, så att löst kopplade system kan interagera utan att de behöver länkas ihop.
  • Ta emot händelser från en eller flera utgivare, transformera dem för att bättre uppfylla behoven i ditt ekosystem och publicera sedan de transformerade händelserna till en ny ström som konsumenterna kan observera.

| KällkodPaket (PyPi) | Paket (Conda) | API-referensdokumentation | Produktdokumentation | Prover

Komma igång

Förutsättningar

  • Python 3.7 eller senare.

  • Microsoft Azure-prenumeration: Om du vill använda Azure-tjänster, inklusive Azure Event Hubs, behöver du en prenumeration. Om du inte har ett befintligt Azure-konto kan du registrera dig för en kostnadsfri utvärderingsversion eller använda dina MSDN-prenumerantförmåner när du skapar ett konto.

  • Event Hubs-namnrymd med en händelsehubb: Om du vill interagera med Azure Event Hubs måste du också ha ett namnområde och en händelsehubb tillgänglig. Om du inte är bekant med att skapa Azure-resurser kanske du vill följa den stegvisa guiden för att skapa en händelsehubb med hjälp av Azure Portal. Där hittar du även detaljerade instruktioner för hur du använder Azure CLI-, Azure PowerShell- eller Azure Resource Manager-mallar (ARM) för att skapa en händelsehubb.

Installera paketet

Installera Azure Event Hubs-klientbiblioteket för Python med pip:

$ pip install azure-eventhub

Autentisera klienten

Interaktion med Event Hubs börjar med en instans av klassen EventHubConsumerClient eller EventHubProducerClient. Du behöver antingen värdnamnet, SAS/AAD-autentiseringsuppgifterna och händelsehubbens namn eller en anslutningssträng för att instansiera klientobjektet.

Skapa klient från anslutningssträng:

För att Event Hubs-klientbiblioteket ska interagera med en händelsehubb är det enklaste sättet att använda en anslutningssträng, som skapas automatiskt när du skapar ett Event Hubs-namnområde. Om du inte är bekant med principer för delad åtkomst i Azure kanske du vill följa den stegvisa guiden för att få en Event Hubs-anslutningssträng.

  • Metoden from_connection_string tar anslutningssträng av formuläret Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> och entitetsnamnet till din Event Hub-instans. Du kan hämta anslutningssträng från Azure Portal.

Skapa en klient med hjälp av azure-identity-biblioteket:

Alternativt kan man använda ett autentiseringsuppgiftsobjekt för att autentisera via AAD med paketet azure-identity.

  • Den här konstruktorn som visas i exemplet som länkas ovan tar värdnamnet och entitetsnamnet för din Event Hub-instans och autentiseringsuppgifter som implementerar TokenCredential-protokollet . Det finns implementeringar av protokollet TokenCredential i paketet azure-identity. Värdnamnet har formatet <yournamespace.servicebus.windows.net>.
  • Om du vill använda de typer av autentiseringsuppgifter som tillhandahålls av azure-identityinstallerar du paketet: pip install azure-identity
  • Om du vill använda asynkront API måste du dessutom först installera en asynkron transport, till exempel aiohttp: pip install aiohttp
  • När du använder Azure Active Directory måste huvudkontot tilldelas en roll som ger åtkomst till Event Hubs, till exempel rollen Azure Event Hubs dataägare. Mer information om hur du använder Azure Active Directory-auktorisering med Event Hubs finns i den associerade dokumentationen.

Viktiga begrepp

  • En EventHubProducerClient är en källa till telemetridata, diagnostikinformation, användningsloggar eller andra loggdata, som en del av en inbäddad enhetslösning, ett mobilt enhetsprogram, en speltitel som körs på en konsol eller annan enhet, en klient- eller serverbaserad affärslösning eller en webbplats.

  • En EventHubConsumerClient hämtar sådan information från händelsehubben och bearbetar den. Bearbetningen kan omfatta aggregering, komplex beräkning och filtrering. Bearbetningen kan också omfatta distribution eller lagring av informationen på ett obearbetat eller transformerat sätt. Event Hub-konsumenter är ofta robusta och storskaliga plattformsinfrastrukturdelar med inbyggda analysfunktioner som Azure Stream Analytics, Apache Spark eller Apache Storm.

  • En partition är en ordnad sekvens av händelser som lagras i en händelsehubb. Azure Event Hubs tillhandahåller meddelandeströmning via ett partitionerat konsumentmönster där varje konsument endast läser en viss delmängd, eller partition, av meddelandeströmmen. När nya händelser anländer läggs de till i slutet av denna sekvens. Antalet partitioner anges när en händelsehubb skapas och kan inte ändras.

  • En konsumentgrupp är en vy över en hel händelsehubb. Konsumentgrupper gör det möjligt för flera konsumerande program att ha en separat vy över händelseströmmen och att läsa dataströmmen oberoende av varandra i sin egen takt och från sin egen position. Det kan finnas högst 5 samtidiga läsare på en partition per konsumentgrupp. Vi rekommenderar dock att det bara finns en aktiv konsument för en viss partition och parkoppling av konsumentgrupper. Varje aktiv läsare tar emot alla händelser från partitionen. Om det finns flera läsare på samma partition får de duplicerade händelser.

Fler begrepp och djupare diskussion finns i: Event Hubs-funktioner. Dessutom är begreppen för AMQP väl dokumenterade i OASIS Advanced Messaging Queuing Protocol (AMQP) version 1.0.

Trådsäkerhet

Vi garanterar inte att EventHubProducerClient eller EventHubConsumerClient är trådsäkra. Vi rekommenderar inte att du återanvänder dessa instanser över trådar. Det är upp till det program som körs att använda dessa klasser på ett trådsäkert sätt.

Datamodelltypen EventDataBatch är inte trådsäker. Den bör inte delas mellan trådar eller användas samtidigt med klientmetoder.

Exempel

Följande avsnitt innehåller flera kodfragment som täcker några av de vanligaste Event Hubs-uppgifterna, inklusive:

Inspektera en händelsehubb

Hämta partitions-ID:t för en händelsehubb.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Publicera händelser till en händelsehubb

create_batch Använd metoden på EventHubProducerClient för att skapa ett EventDataBatch objekt som sedan kan skickas med hjälp av send_batch metoden . Händelser kan läggas till med hjälp av EventDataBatchadd metoden tills den maximala batchstorleksgränsen i byte har uppnåtts.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Använda händelser från en händelsehubb

Det finns flera sätt att använda händelser från en EventHub. För att bara utlösa ett återanrop när en händelse tas emot används EventHubConsumerClient.receive metoden på följande sätt:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Använda händelser från en händelsehubb i batchar

Exemplet ovan utlöser återanropet för varje meddelande när det tas emot, men följande exempel utlöser återanropet på en batch med händelser och försöker ta emot ett nummer i taget.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Publicera händelser till en händelsehubb asynkront

create_batch Använd metoden på EventHubProducer för att skapa ett EventDataBatch objekt som sedan kan skickas med hjälp av send_batch metoden . Händelser kan läggas till med hjälp av EventDataBatchadd metoden tills den maximala batchstorleksgränsen i byte har uppnåtts.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Använda händelser från en händelsehubb asynkront

Denna SDK stöder både synkron och asynkron kod. För att ta emot som visas i exemplen ovan, men inom aio, skulle man behöva följande:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Använda händelser från en händelsehubb i batchar asynkront

Alla synkrona funktioner stöds också i aio. Som visas ovan för synkront batchkvitto kan man åstadkomma samma sak inom asynkronisering enligt följande:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Använda händelser och spara kontrollpunkter med hjälp av ett kontrollpunktsarkiv

EventHubConsumerClient är en konstruktion på hög nivå som gör att du kan ta emot händelser från flera partitioner samtidigt och belastningsutjämning med andra konsumenter som använder samma händelsehubb och konsumentgrupp.

Detta gör det också möjligt för användaren att spåra förloppet när händelser bearbetas med hjälp av kontrollpunkter.

En kontrollpunkt är avsedd att representera den senast bearbetade händelsen av användaren från en viss partition i en konsumentgrupp i en händelsehubbinstans. EventHubConsumerClient använder en instans av CheckpointStore för att uppdatera kontrollpunkter och för att lagra relevant information som krävs av belastningsutjämningsalgoritmen.

Sök pypi med prefixet azure-eventhub-checkpointstore för att hitta paket som stöder detta och använda implementeringen CheckpointStore från ett sådant paket. Observera att både synkroniseringsbibliotek och asynkrona bibliotek tillhandahålls.

I exemplet nedan skapar vi en instans av EventHubConsumerClient och använder en BlobCheckpointStore. Du måste skapa ett Azure Storage-konto och en blobcontainer för att köra koden.

Azure Blob Storage Checkpoint Store Async och Azure Blob Storage Checkpoint Store Sync är en av de CheckpointStore implementeringar vi tillhandahåller som gäller Azure Blob Storage som beständig lagring.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Använda EventHubConsumerClient för att arbeta med IoT Hub

Du kan även använda EventHubConsumerClient för att arbeta med IoT Hub. Detta är användbart för att ta emot telemetridata för IoT Hub från den länkade EventHub. Den associerade anslutningssträng kommer inte att ha skicka anspråk, vilket innebär att det inte går att skicka händelser.

Observera att anslutningssträng måste vara för en Event Hub-kompatibel slutpunkt, t.ex. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Det finns två sätt att hämta den Event Hubs-kompatibla slutpunkten:

  • Hämta de inbyggda slutpunkterna manuellt för IoT Hub i Azure-portalen och ta emot från den.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Felsökning

Mer information om hur du diagnostiserar olika felscenarier finns i azure-eventhubsfelsökningsguiden .

Nästa steg

Mer exempelkod

Ta en titt på exempelkatalogen för detaljerade exempel på hur du använder det här biblioteket för att skicka och ta emot händelser till och från Event Hubs.

Dokumentation

Referensdokumentation finns här.

Schemaregister och Avro-kodare

EventHubs SDK integreras bra med Schema Registry-tjänsten och Avro. Mer information finns i Schema Registry SDK och Schema Registry Avro Encoder SDK.

Pure Python AMQP Transport and Backward Compatibility Support

Klientbiblioteket Azure Event Hubs baseras nu på en ren Python AMQP-implementering. uAMQP har tagits bort som obligatoriskt beroende.

Så här använder uAMQP du som underliggande transport:

  1. Installera uamqp med pip.
$ pip install uamqp 
  1. Passera uamqp_transport=True under klientkonstruktion.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Obs! Attributet message för EventData/EventDataBatch, som tidigare exponerade uamqp.Message, har blivit inaktuellt. "Äldre" objekt som returneras av EventData.message/EventDataBatch.message har introducerats för att underlätta övergången.

Skapa uAMQP-hjul från källa

Om uAMQP är avsett att användas som den underliggande AMQP-protokollimplementeringen för azure-eventhubkan uAMQP-hjul hittas för de flesta större operativsystem.

Om du tänker använda uAMQP och du kör på en plattform där uAMQP-hjul inte tillhandahålls följer du uAMQP-installationsvägledningen för att installera från källan.

Ge feedback

Om du stöter på buggar eller har förslag kan du skicka in ett problem i avsnittet Problem i projektet.

Bidra

Det här projektet välkomnar bidrag och förslag. Merparten av bidragen kräver att du godkänner ett licensavtal för bidrag, där du deklarerar att du har behörighet att bevilja oss rättigheten att använda ditt bidrag, och att du dessutom uttryckligen gör så. Mer information finns på https://cla.microsoft.com.

När du skickar en pull-förfrågan avgör en CLA-robot automatiskt om du måste tillhandahålla ett licensavtal för bidrag med lämplig PR (t.ex. etikett eller kommentar). Följ bara robotens anvisningar. Du behöver bara göra detta en gång för alla repor som använder vårt licensavtal för bidrag.

Det här projektet använder sig av Microsofts uppförandekod för öppen källkod. Mer information finns i Vanliga frågor och svar om uppförandekoden eller kontakta opencode@microsoft.com med ytterligare frågor eller kommentarer.

Visningar