Sdílet prostřednictvím


Azure Event Hubs klientské knihovny pro Python – verze 5.11.5

Azure Event Hubs je vysoce škálovatelná služba publikování a odběru, která může ingestovat miliony událostí za sekundu a streamovat je více příjemcům. To vám umožní zpracovávat a analyzovat obrovské objemy dat generovaných připojenými zařízeními a aplikacemi. Jakmile služba Event Hubs shromáždí data, můžete je načítat, transformovat a ukládat pomocí libovolného poskytovatele analýz v reálném čase nebo pomocí adaptérů dávkování a úložiště. Pokud se chcete o Azure Event Hubs dozvědět více, můžete si projít téma Co je Event Hubs?

Klientská knihovna služby Azure Event Hubs umožňuje publikovat a využívat události služby Azure Event Hubs a můžete ji použít k následujícím účelům:

  • Generování telemetrie o vaší aplikaci pro účely business intelligence a diagnostiky
  • Publikování faktů o stavu vaší aplikace, které můžou sledovat zúčastněné strany a používat je jako triggery pro aktivaci akcí
  • Sledování zajímavých operací a interakcí v rámci vašeho podnikového nebo jiného ekosystému, aby spolu mohly komunikovat volně propojené systémy bez nutnosti mezi nimi vytvářet vazby
  • Příjem událostí od jednoho nebo více vydavatelů, jejich transformace tak, aby lépe vyhovovaly potřebám vašeho ekosystému, a následné publikování transformovaných událostí do nového streamu, ve kterém je můžou sledovat příjemci

Zdrojový kód | Balíček (PyPi) | Balíček (Conda) | Referenční dokumentace k | rozhraní APIDokumentace k | produktuVzorky

Začínáme

Požadavky

  • Python 3.7 nebo novější.

  • Předplatné Microsoft Azure: Abyste mohli používat služby Azure, včetně Azure Event Hubs, budete potřebovat předplatné. Pokud nemáte existující účet Azure, můžete si při vytváření účtu zaregistrovat bezplatnou zkušební verzi nebo využít výhody předplatného MSDN.

  • Obor názvů služby Event Hubs s centrem událostí: Pokud chcete pracovat s Azure Event Hubs, musíte mít také k dispozici obor názvů a centrum událostí. Pokud nemáte zkušenosti s vytvářením prostředků Azure, možná budete chtít postupovat podle podrobného průvodce vytvořením centra událostí pomocí Azure Portal. Najdete tam také podrobné pokyny k vytvoření centra událostí pomocí šablon Azure CLI, Azure PowerShell nebo Azure Resource Manager (ARM).

Instalace balíčku

Nainstalujte klientskou knihovnu Azure Event Hubs pro Python pomocí pipu:

$ pip install azure-eventhub

Ověření klienta

Interakce se službou Event Hubs začíná instancí třídy EventHubConsumerClient nebo EventHubProducerClient. K vytvoření instance objektu klienta potřebujete buď název hostitele, přihlašovací údaje SAS/AAD a název centra událostí, nebo připojovací řetězec.

Vytvoření klienta z připojovací řetězec:

Pro klientskou knihovnu Služby Event Hubs pro interakci s centrem událostí je nejjednodušší použít připojovací řetězec, který se automaticky vytvoří při vytváření oboru názvů služby Event Hubs. Pokud neznáte zásady sdíleného přístupu v Azure, možná budete chtít získat připojovací řetězec služby Event Hubs podle podrobného průvodce.

  • Metoda from_connection_string převezme připojovací řetězec formuláře Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> a názvu entity do vaší instance centra událostí. Připojovací řetězec můžete získat z Azure Portal.

Vytvořte klienta pomocí knihovny azure-identity:

Případně můžete použít objekt Credential k ověření prostřednictvím AAD pomocí balíčku azure-identity.

  • Tento konstruktor demonstrovaný v ukázce propojené výše přebírá název hostitele a název entity vaší instance centra událostí a přihlašovací údaje, které implementují protokol TokenCredential . V balíčku azure-identity jsou k dispozici implementace TokenCredential protokolu. Název hostitele je ve formátu <yournamespace.servicebus.windows.net>.
  • Pokud chcete použít typy přihlašovacích údajů poskytované nástrojem azure-identity, nainstalujte balíček: pip install azure-identity
  • Pokud chcete používat asynchronní rozhraní API, musíte nejprve nainstalovat asynchronní přenos, například aiohttp: pip install aiohttp
  • Pokud používáte Azure Active Directory, musí mít objekt zabezpečení přiřazenou roli, která umožňuje přístup ke službě Event Hubs, například roli vlastníka dat Azure Event Hubs. Další informace o použití autorizace Azure Active Directory se službou Event Hubs najdete v související dokumentaci.

Klíčové koncepty

  • EventHubProducerClient je zdrojem telemetrických dat, diagnostických informací, protokolů využití nebo jiných dat protokolů jako součást integrovaného řešení zařízení, aplikace pro mobilní zařízení, herního titulu běžícího na konzoli nebo jiném zařízení, některého klientského nebo serverového obchodního řešení nebo webu.

  • EventHubConsumerClient takové informace přebírá z centra událostí a zpracovává je. Zpracování může zahrnovat agregaci, komplexní výpočty a filtrování. Zpracování může také zahrnovat distribuci nebo ukládání informací nezpracovaným nebo transformovaným způsobem. Příjemci centra událostí jsou často robustní a vysoce škálovatelné součásti infrastruktury platformy s integrovanými analytickými funkcemi, jako jsou Azure Stream Analytics, Apache Spark nebo Apache Storm.

  • Oddíl je seřazená posloupnost událostí, která se nachází v centru událostí. Azure Event Hubs poskytuje streamování zpráv prostřednictvím modelu rozděleného příjemce, ve kterém každý příjemce čte pouze určitou podmnožinu neboli oddíl streamu zpráv. Události, které nově přichází, se zařazují na konec této posloupnosti. Počet oddílů je zadaný v okamžiku vytvoření centra událostí a nelze ho změnit.

  • Skupina příjemců je zobrazení celého centra událostí. Skupiny příjemců umožňují, aby každá z nich měla samostatné zobrazení streamu událostí a mohla stream číst nezávisle vlastním tempem a z vlastní pozice. Na oddílu na skupinu příjemců může být maximálně 5 souběžných čtenářů. Doporučuje se však, aby pro daný oddíl a párování skupin příjemců existoval pouze jeden aktivní příjemce. Každý aktivní čtenář obdrží všechny události ze svého oddílu; Pokud je ve stejném oddílu více čtenářů, obdrží duplicitní události.

Další koncepty a hlubší diskuzi najdete v tématu Funkce služby Event Hubs. Koncepty AMQP jsou také dobře zdokumentované v protokolu OASIS Advanced Messaging Queuing Protocol (AMQP) verze 1.0.

Bezpečnost vlákna

Nezaručujeme, že EventHubProducerClient nebo EventHubConsumerClient jsou bezpečné pro přístup z více vláken. Nedoporučujeme tyto instance opakovaně používat napříč vlákny. Je na spuštěné aplikaci, aby tyto třídy používala způsobem bezpečným pro přístup z více vláken.

Typ datového modelu EventDataBatch není bezpečný z více vláken. Neměl by se sdílet mezi vlákny ani používat souběžně s klientskými metodami.

Příklady

Následující části obsahují několik fragmentů kódu, které pokrývají některé nejběžnější úlohy služby Event Hubs, mezi které patří:

Kontrola centra událostí

Získejte ID oddílů centra událostí.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Publikování událostí do centra událostí

Použijte metodu create_batch on EventHubProducerClient k vytvoření objektu EventDataBatch , který pak lze odeslat pomocí send_batch metody . Události se můžou přidávat do EventDataBatch metody, add dokud nebude dosaženo maximálního limitu velikosti dávky v bajtech.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Využívání událostí z centra událostí

Existuje několik způsobů, jak využívat události z EventHubu. Pokud chcete jednoduše aktivovat zpětné volání při přijetí události, EventHubConsumerClient.receive použije se metoda následujícím způsobem:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Využívání událostí z centra událostí v dávkách

Zatímco výše uvedený vzorek aktivuje zpětné volání pro každou přijatou zprávu, následující ukázka aktivuje zpětné volání u dávky událostí a pokusí se přijmout číslo najednou.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Asynchronní publikování událostí do centra událostí

Použijte metodu create_batch on EventHubProducer k vytvoření objektu EventDataBatch , který pak lze odeslat pomocí send_batch metody . Události se můžou přidávat do EventDataBatch metody, add dokud nebude dosaženo maximálního limitu velikosti dávky v bajtech.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Asynchronní využívání událostí z centra událostí

Tato sada SDK podporuje synchronní i asynchronní kód. K získání, jak je znázorněno ve výše uvedených vzorcích, ale v rámci aio, by bylo potřeba následující:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Asynchronní využívání událostí z centra událostí v dávkách

Všechny synchronní funkce jsou podporovány i v systému aio. Jak je znázorněno výše pro synchronní příjem dávky, můžete to samé provést v rámci asyncio následujícím způsobem:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Využívání událostí a ukládání kontrolních bodů pomocí úložiště kontrolních bodů

EventHubConsumerClient je konstruktor vysoké úrovně, který umožňuje přijímat události z více oddílů najednou a vyrovnávat zatížení s ostatními příjemci pomocí stejného centra událostí a skupiny příjemců.

To uživateli také umožňuje sledovat průběh zpracování událostí pomocí kontrolních bodů.

Kontrolní bod představuje poslední úspěšně zpracovanou událost uživatelem z konkrétního oddílu skupiny příjemců v instanci centra událostí. Nástroj EventHubConsumerClient používá instanci nástroje k aktualizaci kontrolních bodů a k ukládání relevantních CheckpointStore informací požadovaných algoritmem vyrovnávání zatížení.

Vyhledejte pypi s předponou azure-eventhub-checkpointstore a vyhledejte balíčky, které to podporují, a použijte implementaci CheckpointStore z jednoho takového balíčku. Mějte na paměti, že jsou k dispozici synchronizační i asynchronní knihovny.

V následujícím příkladu vytvoříme instanci EventHubConsumerClient a použijeme BlobCheckpointStore. Abyste mohli kód spustit, musíte vytvořit účet služby Azure Storage a kontejner objektů blob .

Azure Blob Storage async checkpoint storea Azure Blob Storage Checkpoint Store Sync jsou jednou z CheckpointStore implementací, které poskytujeme a které se Azure Blob Storage používají jako trvalé úložiště.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Použití EventHubConsumerClient pro práci s IoT Hub

Můžete také použít k EventHubConsumerClient práci s IoT Hub. To je užitečné pro příjem telemetrických dat IoT Hub z propojeného centra událostí. Přidružená připojovací řetězec nebude mít odeslané deklarace identity, takže odesílání událostí není možné.

Všimněte si, že připojovací řetězec musí být pro koncový bod kompatibilní s centrem událostí, např. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Existují dva způsoby, jak získat koncový bod kompatibilní se službou Event Hubs:

  • Ručně získejte předdefinované koncové body IoT Hub na webu Azure Portal a získejte z něj příjem.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Poradce při potížích

azure-eventhubs Podrobnosti o tom, jak diagnostikovat různé scénáře selhání, najdete v průvodci odstraňováním potíží.

Další kroky

Další vzorový kód

Podrobné příklady použití této knihovny k odesílání a přijímání událostí do a ze služby Event Hubs najdete v adresáři samples .

Dokumentace

Referenční dokumentace je k dispozici tady.

Registr schémat a kodér Avro

Sada EventHubs SDK se dobře integruje se službou Registru schémat a službou Avro. Další informace najdete v tématu Sada SDK registru schémat a sada SDK registru schématu Avro Encoder SDK.

Čistý přenos AMQP v Pythonu a podpora zpětné kompatibility

Klientská knihovna Azure Event Hubs je teď založená na čisté implementaci PythonU AMQP. uAMQP byla odebrána jako požadovaná závislost.

uAMQP Použití jako základního přenosu:

  1. Nainstalujte uamqp pomocí pipu.
$ pip install uamqp 
  1. Průchod uamqp_transport=True během výstavby klienta.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Poznámka: Atribut messageEventDataBatchEventData/, který dříve zveřejnil uamqp.Message, je zastaralý. "Starší" objekty, které EventData.message/EventDataBatch.message vrací, byly zavedeny, aby usnadnily přechod.

Vytváření kolečka uAMQP ze zdroje

Pokud je uAMQP určena k použití jako základní implementace protokolu AMQP pro azure-eventhub, lze u většiny hlavních operačních systémů najít kola uAMQP.

Pokud máte v úmyslu používat uAMQP platformu, pro kterou nejsou k dispozici kola uAMQP, postupujte podle pokynů k instalaci uAMQP ze zdroje.

Zadání zpětné vazby

Pokud narazíte na nějaké chyby nebo máte návrhy, nahlaste problém v části Problémy projektu.

Přispívání

Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com

Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.

Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo kontaktujte s opencode@microsoft.com případnými dalšími dotazy nebo připomínkami.

Imprese