Sdílet prostřednictvím


Rychlý start: Odesílání událostí do centra událostí nebo příjem událostí pomocí Pythonu

V tomto rychlém startu se dozvíte, jak odesílat události do centra událostí a přijímat je z centra událostí pomocí balíčku Python azure-eventhub .

Požadavky

Pokud s Azure Event Hubs teprve začínáte, podívejte se na přehled služby Event Hubs před tímto rychlým startem.

K dokončení tohoto rychlého startu se ujistěte, že máte následující požadavky:

  • Předplatné Microsoft Azure: Pokud ho nemáte, zaregistrujte si bezplatnou zkušební verzi .
  • Python 3.8 nebo novější: Ujistěte se, že je pip nainstalovaný a aktualizovaný.
  • Visual Studio Code (doporučeno): Nebo použijte jakékoli jiné integrované vývojové prostředí (IDE) podle vašeho výběru.
  • Obor názvů služby Event Hubs a centrum událostí: Postupujte podle tohoto průvodce a vytvořte je na webu Azure Portal.

Instalace balíčků pro odesílání událostí

Pokud chcete nainstalovat balíčky Pythonu pro službu Event Hubs, otevřete příkazový řádek s Pythonem v jeho cestě. Změňte adresář na složku, do které chcete zachovat ukázky.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Ověření aplikace v Azure

V tomto rychlém startu se můžete připojit ke službě Azure Event Hubs dvěma způsoby:

  • Bez hesla. Použijte svůj princip zabezpečení v Microsoft Entra ID a řízení přístupu na základě role (RBAC) k připojení k oboru názvů Event Hubs. Nemusíte si dělat starosti s pevně zakódovanými připojovacími řetězci v kódu, v konfiguračním souboru nebo v zabezpečeném úložišti, jako je Azure Key Vault.
  • Připojovací řetězec. Pomocí připojovacího řetězce připojte se k oboru názvů služby Event Hubs. Pokud s Azure začínáte, možná zjistíte, že připojovací řetězec možnost je jednodušší postupovat.

Doporučujeme používat možnost bez hesla v reálných aplikacích a produkčních prostředích. Další informace najdete v tématu Ověřování a autorizace služby Service Bus apřipojení bez hesla pro služby Azure.

Přiřazení rolí uživateli Microsoft Entra

Při místním vývoji se ujistěte, že uživatelský účet, který se připojuje ke službě Azure Event Hubs, má správná oprávnění. K odesílání a příjmu zpráv potřebujete roli Vlastník dat služby Azure Event Hubs . K přiřazení této role potřebujete roli Správce uživatelských přístupů nebo jinou roli, která tuto Microsoft.Authorization/roleAssignments/write akci zahrnuje. Role Azure RBAC můžete uživateli přiřadit pomocí webu Azure Portal, Azure CLI nebo Azure PowerShellu. Další informace najdete na stránce Pochopit rozsah pro Azure RBAC.

Následující příklad přiřadí roli k vašemu uživatelskému Azure Event Hubs Data Owner účtu, který poskytuje úplný přístup k prostředkům služby Azure Event Hubs. V reálném scénáři postupujte podle principu nejnižšího oprávnění , aby uživatelům poskytla pouze minimální oprávnění potřebná pro bezpečnější produkční prostředí.

Předdefinované role Azure pro Azure Event Hubs

Pro Azure Event Hubs je správa oborů názvů a všech souvisejících prostředků prostřednictvím Azure portálu a rozhraní API pro správu prostředků Azure již chráněna pomocí modelu Azure RBAC (Role-Based Access Control). Azure poskytuje následující předdefinované role pro autorizaci přístupu k oboru názvů služby Event Hubs:

  • Vlastník dat služby Azure Event Hubs: Umožňuje přístup k datům v oboru názvů služby Event Hubs a k jeho entitám (frontám, tématům, odběrům a filtrům).
  • Odesílatel dat služby Azure Event Hubs: Pomocí této role dáte odesílateli přístup k oboru názvů služby Event Hubs a jeho entitám.
  • Příjemce dat služby Azure Event Hubs: Pomocí této role dejte příjemci přístup k oboru názvů služby Event Hubs a jeho entitám.

Pokud chcete vytvořit vlastní roli, přečtěte si téma Práva potřebná pro operace služby Event Hubs.

Důležité

Ve většině případů trvá přiřazení role v Azure minutu nebo dvě. Ve výjimečných případech může trvat až osm minut. Pokud při prvním spuštění kódu dojde k chybám ověřování, chvíli počkejte a zkuste to znovu.

  1. Na webu Azure Portal vyhledejte obor názvů služby Event Hubs pomocí hlavního panelu hledání nebo levé navigace.

  2. Na stránce přehledu vyberte v nabídce vlevo řízení přístupu (IAM ).

  3. Na stránce Řízení přístupu (IAM) vyberte kartu Přiřazení rolí.

  4. V horní nabídce vyberte + Přidat . Pak vyberte Přidat přiřazení role.

    Snímek obrazovky znázorňující, jak přiřadit roli

  5. Pomocí vyhledávacího pole vyfiltrujte výsledky podle požadované role. V tomto příkladu vyhledejte Azure Event Hubs Data Owner a vyberte odpovídající výsledek. Pak zvolte Další.

  6. V části Přiřadit přístup vyberte Uživatele, skupinu nebo aplikační objekt. Pak zvolte + Vybrat členy.

  7. V dialogovém okně vyhledejte své uživatelské jméno Microsoft Entra (obvykle vaše user@domain e-mailová adresa). V dolní části dialogového okna zvolte Vybrat .

  8. Výběrem možnosti Zkontrolovat a přiřadit přejděte na poslední stránku. Znovu vyberte Zkontrolovat a přiřadit, abyste proces dokončili.

Odesílání událostí

V této části vytvořte skript Pythonu pro odesílání událostí do centra událostí, které jste vytvořili dříve.

  1. Otevřete oblíbený editor Pythonu, například Visual Studio Code.

  2. Vytvořte skript s názvem send.py. Tento skript odešle dávku událostí do centra událostí, které jste vytvořili dříve.

  3. Do send.py vložte následující kód:

    V kódu použijte skutečné hodnoty k nahrazení následujících zástupných symbolů:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE – Plně kvalifikovaný název se zobrazí na stránce Přehled oboru názvů. Měl by být ve formátu: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME – Název centra událostí.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Poznámka:

    Příklady dalších možností odesílání událostí do centra událostí asynchronně pomocí připojovacího řetězce najdete na stránce send_async.py GitHubu. Zobrazené vzory platí také pro odesílání událostí bez hesla.

Příjem událostí

Tento rychlý start používá úložiště objektů blob v Azure jako úložiště kontrolních bodů. Úložiště kontrolních bodů se používá k zachování kontrolních bodů (to znamená poslední pozice čtení).

Při použití služby Azure Blob Storage jako úložiště kontrolních bodů postupujte podle těchto doporučení:

  • Pro každou skupinu příjemců použijte samostatný kontejner. Můžete použít stejný účet úložiště, ale pro každou skupinu použít jeden kontejner.
  • Nepoužívejte účet úložiště pro nic jiného.
  • Nepoužívejte kontejner pro nic jiného.
  • Vytvořte účet úložiště ve stejné oblasti jako nasazená aplikace. Pokud je aplikace místní, zkuste zvolit nejbližší možnou oblast.

Na stránce účtu úložiště na webu Azure Portal v části Blob Service se ujistěte, že jsou zakázaná následující nastavení.

  • Hierarchický obor názvů
  • Dočasné smazání objektů blob
  • Vytváření verzí

Vytvořte účet úložiště Azure a blob kontejner

Vytvořte v něm účet úložiště Azure a kontejner objektů blob pomocí následujícího postupu:

  1. Vytvoření účtu služby Azure Storage
  2. Vytvořte kontejner pro objekty blob.
  3. Ověřte se do kontejneru blobů.

Nezapomeňte zaznamenat připojovací řetězec a název kontejneru pro pozdější použití v kódu pro příjem.

Při místním vývoji se ujistěte, že uživatelský účet, který přistupuje k datům objektů blob, má správná oprávnění. Ke čtení a zápisu dat objektů blob potřebujete Přispěvatel dat objektů blob služby Storage . Abyste mohli tuto roli přiřadit sami sobě, musíte mít přiřazenou roli Správce uživatelských přístupů nebo jinou roli, která zahrnuje akci Microsoft.Authorization/roleAssignments/write . Role Azure RBAC můžete uživateli přiřadit pomocí webu Azure Portal, Azure CLI nebo Azure PowerShellu. Další informace najdete v tématu Vysvětlení rozsahu azure RBAC.

V tomto scénáři přiřadíte oprávnění ke svému uživatelskému účtu na úrovni účtu úložiště, abyste dodrželi princip nejnižších oprávnění. Tento postup poskytuje uživatelům jenom minimální potřebná oprávnění a vytváří bezpečnější produkční prostředí.

Následující příklad přiřadí roli Přispěvatel dat v objektech blob služby Storage k vašemu uživatelskému účtu, který poskytuje přístup ke čtení i zápisu k datům objektů blob v účtu úložiště.

Důležité

Ve většině případů trvá přiřazení role v Azure minutu nebo dvě. Ve výjimečných případech může trvat až osm minut. Pokud při prvním spuštění kódu dojde k chybám ověřování, chvíli počkejte a zkuste to znovu.

  1. Na webu Azure Portal vyhledejte svůj účet úložiště pomocí hlavního panelu hledání nebo levé navigace.

  2. Na stránce účtu úložiště v nabídce vlevo vyberte Řízení přístupu (IAM ).

  3. Na stránce Řízení přístupu (IAM) vyberte kartu Přiřazení rolí.

  4. V horní nabídce vyberte + Přidat . Pak vyberte Přidat přiřazení role.

    Snímek obrazovky znázorňující, jak přiřadit roli účtu úložiště

  5. Pomocí vyhledávacího pole vyfiltrujte výsledky podle požadované role. V tomto příkladu vyhledejte Přispěvatel dat pro úložiště Blob. Vyberte odpovídající výsledek a pak zvolte Další.

  6. V části Přiřadit přístup vyberte Uživatel, skupina nebo instanční objekt a pak zvolte + Vybrat členy.

  7. V dialogovém okně vyhledejte své uživatelské jméno Microsoft Entra (obvykle vaše user@domain e-mailová adresa) a pak v dolní části dialogového okna zvolte Vybrat .

  8. Výběrem možnosti Zkontrolovat a přiřadit přejděte na poslední stránku. Znovu vyberte Zkontrolovat a přiřadit, abyste proces dokončili.

Instalace balíčků pro příjem událostí

Pro přijímající stranu je potřeba nainstalovat jeden nebo více balíčků. V tomto rychlém startu použijete azure Blob Storage k zachování kontrolních bodů, aby program nepřečetl události, které už četl. Provádí kontrolní body metadat u přijatých zpráv v pravidelných intervalech v objektu blob. Tento přístup usnadňuje pozdější přijímání zpráv z místa, kde jste skončili.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Vytvoření skriptu Pythonu pro příjem událostí

V této části vytvoříte skript Pythonu pro příjem událostí z centra událostí:

  1. Otevřete oblíbený editor Pythonu, například Visual Studio Code.

  2. Vytvořte skript s názvem recv.py.

  3. Do recv.py vložte následující kód:

    V kódu použijte skutečné hodnoty k nahrazení následujících zástupných symbolů:

    • BLOB_STORAGE_ACCOUNT_URL – Tato hodnota by měla být ve formátu: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME – Název kontejneru objektů blob v účtu úložiště Azure.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE – Plně kvalifikovaný název se zobrazí na stránce Přehled oboru názvů. Měl by být ve formátu: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME – Název centra událostí.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Poznámka:

    Příklady dalších možností pro příjem událostí z centra událostí asynchronně pomocí připojovacího řetězce najdete na stránce recv_with_checkpoint_store_async.py GitHubu. Zobrazené vzory platí také pro příjem událostí bez hesla.

Spuštění aplikace příjemce

  1. Spusťte příkazový řádek.

  2. Spusťte následující příkaz a přihlaste se pomocí účtu, který byl přidán do role Vlastník dat služby Azure Event Hubs v oboru názvů Event Hubs a do role Přispěvatel dat objektů blob služby Azure Storage v účtu úložiště Azure.

    az login
    
  3. Přepněte do složky, která obsahuje soubor receive.py, a spusťte následující příkaz:

    python recv.py
    

Spuštění aplikace odesílatele

  1. Spusťte příkazový řádek.

  2. Spusťte následující příkaz a přihlaste se pomocí účtu, který byl přidán do role Vlastník dat služby Azure Event Hubs v oboru názvů Event Hubs a do role Přispěvatel dat objektů blob služby Azure Storage v účtu úložiště Azure.

    az login
    
  3. Přepněte do složky s send.py a spusťte tento příkaz:

    python send.py
    

V okně příjemce by se měly zobrazit zprávy odeslané do centra událostí.

Řešení problému

Pokud se v okně příjemce nezobrazují události nebo kód hlásí chybu, vyzkoušejte následující tipy pro řešení potíží:

  • Pokud výsledky z recy.py nevidíte, několikrát spusťte send.py .

  • Pokud se při použití kódu bez hesla (s přihlašovacími údaji) zobrazí chyby týkající se korutiny, ujistěte se, že používáte import z azure.identity.aio.

  • Pokud se zobrazí zpráva "Neuzavřená klientská relace" s kódem bez zadání hesla, ale s použitím přihlašovacích údajů, ujistěte se, že po dokončení relaci ukončíte. Další informace najdete v tématu Asynchronní přihlašovací údaje.

  • Pokud se vám při přístupu k úložišti zobrazí chyby autorizace s recv.py, ujistěte se, že jste postupovali podle kroků v Vytvoření účtu úložiště Azure a kontejneru objektů blob a přiřadili roli Přispěvatel dat objektů blob pro úložiště zásadám služby.

  • Pokud obdržíte události s různými ID oddílů, je tento výsledek očekávaný. Oddíly slouží jako mechanismus pro organizaci dat a souvisí se stupněm paralelismu příjmu dat, který vyžadují přijímací aplikace. Počet oddílů v centru událostí přímo souvisí s počtem souběžných čtenářů, které očekáváte. Další informace naleznete v Další informace o oddílech.

Další kroky

V tomto rychlém průvodci jste události asynchronně odeslali a přijímali. Pokud chcete zjistit, jak odesílat a přijímat události synchronně, přejděte na stránku sync_samples GitHubu.

Prozkoumejte další příklady a pokročilé scénáře v klientské knihovně služby Azure Event Hubs pro ukázky Pythonu.