Odesílání nebo příjem událostí z Event Hubs s využitím Pythonu

V tomto rychlém startu se dozvíte, jak odesílat události do centra událostí a přijímat je z centra událostí pomocí balíčku Python azure-eventhub.

Požadavky

Pokud s Azure Event Hubs teprve začínáte, podívejte se na přehled služby Event Hubs před tímto rychlým startem.

K dokončení tohoto rychlého startu potřebujete následující požadavky:

  • Předplatné Microsoft Azure. Pokud chcete používat služby Azure, včetně služby Azure Event Hubs, potřebujete předplatné. Pokud nemáte existující účet Azure, zaregistrujte si bezplatnou zkušební verzi.
  • Python 3.8 nebo novější s nainstalovaným a aktualizovaným pipem
  • Visual Studio Code (doporučeno) nebo jakékoli jiné integrované vývojové prostředí (IDE).
  • Vytvořte obor názvů služby Event Hubs a centrum událostí. Prvním krokem je vytvoření oboru názvů služby Event Hubs pomocí webu Azure Portal a získání přihlašovacích údajů pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.

Instalace balíčků pro odesílání událostí

Pokud chcete nainstalovat balíčky Pythonu pro službu Event Hubs, otevřete příkazový řádek s Pythonem v jeho cestě. Změňte adresář na složku, do které chcete zachovat ukázky.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Ověření aplikace v Azure

V tomto rychlém startu se dozvíte dva způsoby připojení ke službě Azure Event Hubs: bez hesla a připojovací řetězec. První možnost ukazuje, jak se pomocí objektu zabezpečení v Microsoft Entra ID a řízení přístupu na základě role (RBAC) připojit k oboru názvů služby Event Hubs. Nemusíte se starat o pevně zakódované připojovací řetězec v kódu nebo v konfiguračním souboru nebo v zabezpečeném úložišti, jako je Azure Key Vault. Druhá možnost ukazuje, jak se pomocí připojovací řetězec připojit k oboru názvů služby Event Hubs. Pokud s Azure začínáte, můžete najít připojovací řetězec možnost, která se snadněji sleduje. Doporučujeme používat možnost bez hesla v reálných aplikacích a produkčních prostředích. Další informace najdete v tématu Ověřování a autorizace. Další informace o ověřování bez hesla najdete na stránce přehledu.

Přiřazení rolí uživateli Microsoft Entra

Při místním vývoji se ujistěte, že uživatelský účet, který se připojuje ke službě Azure Event Hubs, má správná oprávnění. K odesílání a přijímání zpráv budete potřebovat roli vlastníka dat služby Azure Event Hubs. K přiřazení této role budete potřebovat roli Uživatelský přístup Správa istrator nebo jinou roli, která tuto akci zahrnujeMicrosoft.Authorization/roleAssignments/write. Role Azure RBAC můžete uživateli přiřadit pomocí webu Azure Portal, Azure CLI nebo Azure PowerShellu. Další informace o dostupných oborech pro přiřazení rolí najdete na stránce přehledu oboru.

Následující příklad přiřadí roli k vašemu uživatelskému Azure Event Hubs Data Owner účtu, který poskytuje úplný přístup k prostředkům služby Azure Event Hubs. V reálném scénáři postupujte podle principu nejnižšího oprávnění , aby uživatelům poskytla pouze minimální oprávnění potřebná pro bezpečnější produkční prostředí.

Předdefinované role Azure pro Azure Event Hubs

Pro Službu Azure Event Hubs je správa oborů názvů a všech souvisejících prostředků prostřednictvím webu Azure Portal a rozhraní API pro správu prostředků Azure již chráněno pomocí modelu Azure RBAC. Azure poskytuje následující předdefinované role Azure pro autorizaci přístupu k oboru názvů služby Event Hubs:

  • Vlastník dat služby Azure Event Hubs: Umožňuje přístup k datům k oboru názvů služby Event Hubs a jeho entitám (fronty, témata, odběry a filtry).
  • Odesílatel dat služby Azure Event Hubs: Pomocí této role dáte odesílateli přístup k oboru názvů služby Event Hubs a jeho entitám.
  • Příjemce dat služby Azure Event Hubs: Pomocí této role dejte příjemci přístup k oboru názvů služby Event Hubs a jeho entitám.

Pokud chcete vytvořit vlastní roli, přečtěte si téma Práva potřebná pro operace služby Event Hubs.

Důležité

Ve většině případů bude trvat minutu nebo dvě, než se přiřazení role rozšíří v Azure. Ve výjimečných případech může trvat až osm minut. Pokud při prvním spuštění kódu dojde k chybám ověřování, chvíli počkejte a zkuste to znovu.

  1. Na webu Azure Portal vyhledejte obor názvů služby Event Hubs pomocí hlavního panelu hledání nebo levé navigace.

  2. Na stránce přehledu vyberte v nabídce vlevo řízení přístupu (IAM ).

  3. Na stránce Řízení přístupu (IAM) vyberte kartu Přiřazení rolí.

  4. V horní nabídce vyberte + Přidat a potom přidejte přiřazení role z výsledné rozevírací nabídky.

    A screenshot showing how to assign a role.

  5. Pomocí vyhledávacího pole vyfiltrujte výsledky podle požadované role. V tomto příkladu vyhledejte Azure Event Hubs Data Owner a vyberte odpovídající výsledek. Pak zvolte Další.

  6. V části Přiřadit přístup vyberte Uživatel, skupina nebo instanční objekt a pak zvolte + Vybrat členy.

  7. V dialogovém okně vyhledejte své uživatelské jméno Microsoft Entra (obvykle vaše user@domain e-mailová adresa) a pak v dolní části dialogového okna zvolte Vybrat .

  8. Vyberte Zkontrolovat a přiřadit přejděte na poslední stránku a pak proces dokončete opětovnou kontrolou a přiřazením .

Odesílání událostí

V této části vytvořte skript Pythonu pro odesílání událostí do centra událostí, které jste vytvořili dříve.

  1. Otevřete oblíbený editor Pythonu, například Visual Studio Code.

  2. Vytvořte skript s názvem send.py. Tento skript odešle dávku událostí do centra událostí, které jste vytvořili dříve.

  3. Do send.py vložte následující kód:

    V kódu použijte skutečné hodnoty k nahrazení následujících zástupných symbolů:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Poznámka:

    Příklady dalších možností asynchronního odesílání událostí do centra událostí pomocí připojovací řetězec najdete na stránce send_async.py GitHubu. Zobrazené vzory platí také pro odesílání událostí bez hesla.

Příjem událostí

V tomto rychlém startu se jako úložiště kontrolních bodů používá úložiště objektů blob v Azure. Úložiště kontrolních bodů se používá k zachování kontrolních bodů (to znamená poslední pozice čtení).

Při používání služby Azure Blob Storage jako úložiště kontrolních bodů postupujte podle těchto doporučení:

  • Pro každou skupinu příjemců použijte samostatný kontejner. Můžete použít stejný účet úložiště, ale pro každou skupinu použít jeden kontejner.
  • Nepoužívejte kontejner pro nic jiného a nepoužívejte účet úložiště pro nic jiného.
  • Účet úložiště by měl být ve stejné oblasti jako nasazená aplikace. Pokud je aplikace místní, zkuste zvolit nejbližší možnou oblast.

Na stránce účtu úložiště na webu Azure Portal v části Blob Service se ujistěte, že jsou zakázaná následující nastavení.

  • Hierarchický obor názvů
  • Obnovitelné odstranění objektu blob
  • Vytváření verzí

Vytvoření účtu úložiště Azure a kontejneru objektů blob

Vytvořte v něm účet úložiště Azure a kontejner objektů blob pomocí následujícího postupu:

  1. Vytvoření účtu služby Azure Storage
  2. Vytvořte kontejner objektů blob.
  3. Ověřte se v kontejneru objektů blob.

Nezapomeňte zaznamenat připojovací řetězec a název kontejneru pro pozdější použití v kódu pro příjem.

Při místním vývoji se ujistěte, že uživatelský účet, který přistupuje k datům objektů blob, má správná oprávnění. K čtení a zápisu dat objektů blob budete potřebovat Přispěvatel dat objektů blob služby Storage. Abyste mohli tuto roli přiřadit sami sobě, musíte mít přiřazenou roli User Access Správa istrator nebo jinou roli, která zahrnuje akci Microsoft.Authorization/roleAssignments/write. Role Azure RBAC můžete uživateli přiřadit pomocí webu Azure Portal, Azure CLI nebo Azure PowerShellu. Další informace o dostupných oborech pro přiřazení rolí najdete na stránce přehledu oboru.

V tomto scénáři přiřadíte oprávnění k vašemu uživatelskému účtu s vymezeným oborem účtu úložiště, abyste postupovali podle zásady nejnižších oprávnění. Tento postup poskytuje uživatelům jenom minimální potřebná oprávnění a vytváří bezpečnější produkční prostředí.

Následující příklad přiřadí roli Přispěvatel dat v objektech blob služby Storage k vašemu uživatelskému účtu, který poskytuje přístup ke čtení i zápisu k datům objektů blob v účtu úložiště.

Důležité

Ve většině případů bude trvat minutu nebo dvě, než se přiřazení role rozšíří v Azure, ale ve výjimečných případech může trvat až osm minut. Pokud při prvním spuštění kódu dojde k chybám ověřování, chvíli počkejte a zkuste to znovu.

  1. Na webu Azure Portal vyhledejte svůj účet úložiště pomocí hlavního panelu hledání nebo levé navigace.

  2. Na stránce přehledu účtu úložiště v nabídce vlevo vyberte Řízení přístupu (IAM ).

  3. Na stránce Řízení přístupu (IAM) vyberte kartu Přiřazení rolí.

  4. V horní nabídce vyberte + Přidat a potom přidejte přiřazení role z výsledné rozevírací nabídky.

    A screenshot showing how to assign a storage account role.

  5. Pomocí vyhledávacího pole vyfiltrujte výsledky podle požadované role. V tomto příkladu vyhledejte Přispěvatel dat objektů blob služby Storage a vyberte odpovídající výsledek a pak zvolte Další.

  6. V části Přiřadit přístup vyberte Uživatel, skupina nebo instanční objekt a pak zvolte + Vybrat členy.

  7. V dialogovém okně vyhledejte své uživatelské jméno Microsoft Entra (obvykle vaše user@domain e-mailová adresa) a pak v dolní části dialogového okna zvolte Vybrat .

  8. Vyberte Zkontrolovat a přiřadit přejděte na poslední stránku a pak proces dokončete opětovnou kontrolou a přiřazením .

Instalace balíčků pro příjem událostí

Pro přijímající stranu je potřeba nainstalovat jeden nebo více balíčků. V tomto rychlém startu použijete Azure Blob Storage k zachování kontrolních bodů, aby program nepřečetl události, které už přečetl. Provádí kontrolní body metadat u přijatých zpráv v pravidelných intervalech v objektu blob. Tento přístup usnadňuje pozdější přijímání zpráv z místa, kde jste skončili.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Vytvoření skriptu Pythonu pro příjem událostí

V této části vytvoříte skript Pythonu pro příjem událostí z centra událostí:

  1. Otevřete oblíbený editor Pythonu, například Visual Studio Code.

  2. Vytvořte skript s názvem recv.py.

  3. Do recv.py vložte následující kód:

    V kódu použijte skutečné hodnoty k nahrazení následujících zástupných symbolů:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Poznámka:

    Příklady dalších možností pro asynchronní příjem událostí z centra událostí pomocí připojovací řetězec najdete na stránce recv_with_checkpoint_store_async.py GitHubu. Zobrazené vzory platí také pro příjem událostí bez hesla.

Spuštění aplikace příjemce

Skript spustíte tak, že otevřete příkazový řádek s Pythonem v jeho cestě a pak spustíte tento příkaz:

python recv.py

Spuštění aplikace odesílatele

Skript spustíte tak, že otevřete příkazový řádek s Pythonem v jeho cestě a pak spustíte tento příkaz:

python send.py

V okně příjemce by se měly zobrazit zprávy odeslané do centra událostí.

Řešení problému

Pokud se v okně příjemce nezobrazují události nebo kód hlásí chybu, vyzkoušejte následující tipy pro řešení potíží:

  • Pokud výsledky z recy.py nevidíte, několikrát spusťte send.py .

  • Pokud se při použití kódu bez hesla (s přihlašovacími údaji) zobrazí chyby týkající se korutiny, ujistěte se, že používáte import z azure.identity.aio.

  • Pokud se zobrazí nezařazená relace klienta s kódem bez hesla (s přihlašovacími údaji), ujistěte se, že po dokončení zavřete přihlašovací údaje. Další informace najdete v tématu Asynchronní přihlašovací údaje.

  • Pokud se při přístupu k úložišti zobrazí chyby autorizace s recv.py , ujistěte se, že jste postupovali podle pokynů v tématu Vytvoření účtu úložiště Azure a kontejneru objektů blob a přiřadili k instančnímu objektu roli Přispěvatel dat objektů blob služby.

  • Pokud obdržíte události s různými ID oddílů, očekává se tento výsledek. Oddíly slouží jako mechanismus pro organizaci dat a souvisí se stupněm paralelismu příjmu dat, který vyžadují přijímací aplikace. Počet oddílů v centru událostí přímo souvisí s počtem souběžných čtenářů, které plánujete mít. Další informace najdete v tématu Další informace o oddílech.

Další kroky

V tomto rychlém startu jste asynchronně odeslali a přijímali události. Pokud chcete zjistit, jak odesílat a přijímat události synchronně, přejděte na stránku sync_samples GitHubu.

Všechny ukázky (synchronní i asynchronní) na GitHubu najdete v klientské knihovně Azure Event Hubs pro ukázky Pythonu.