Rövid útmutató: Események küldése eseményközpontokba vagy események fogadása az eseményközpontokból a Python használatával

Ebben a rövid útmutatóban megtudhatja, hogyan küldhet eseményeket egy eseményközpontba, és hogyan fogadhat eseményeket az azure-eventhub Python-csomag használatával.

Előfeltételek

Ha még nem ismerkedik az Azure Event Hubs szolgáltatásokkal, a rövid útmutató előtt tekintse meg az Event Hubs áttekintését .

A rövid útmutató végrehajtásához győződjön meg arról, hogy rendelkezik a következő előfeltételekkel:

  • Microsoft Azure-előfizetés: Regisztráljon ingyenes próbaverzióra , ha még nem rendelkezik ilyen előfizetéssel.
  • Python 3.8 vagy újabb: Győződjön meg arról, hogy a pip telepítve és frissítve van.
  • Visual Studio Code (ajánlott): Vagy használjon tetszőleges más IDE-t.
  • Event Hubs-névtér és eseményközpont: Kövesse ezt az útmutatót , és hozza létre őket az Azure Portalon.

A csomagok telepítése események küldéséhez

Az Event Hubs Python-csomagjainak telepítéséhez nyisson meg egy parancssort, amelyben a Python szerepel az elérési úton. Módosítsa a könyvtárat arra a mappára, ahol meg szeretné tartani a mintákat.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Az alkalmazás hitelesítése az Azure-ban

Ez a rövid útmutató az Azure Event Hubshoz való csatlakozás két módját mutatja be:

  • Jelszó nélküli. Használja a biztonsági főobjektumát a Microsoft Entra ID-ben és a szerepköralapú hozzáférés-vezérlés (RBAC) alkalmazásával, hogy csatlakozzon egy Event Hubs-névtérhez. Nem kell aggódnia amiatt, hogy a kódban, a konfigurációs fájlban vagy a biztonságos tárolóban, például az Azure Key Vaultban hard-kódolt kapcsolati láncok vannak.
  • Kapcsolati lánc. Kapcsolati sztring használatával csatlakozzon egy Event Hubs-névtérhez. Ha még nem ismerkedik az Azure-sal, könnyebben követheti a kapcsolati sztring lehetőséget.

A jelszó nélküli beállítás használatát ajánljuk valós alkalmazásokban és éles környezetekben. További információ: Service Bus-hitelesítés, engedélyezés és jelszó nélküli kapcsolatok az Azure-szolgáltatásokhoz.

Szerepkörök hozzárendelése a Microsoft Entra-felhasználóhoz

Helyi fejlesztéskor győződjön meg arról, hogy az Azure Event Hubshoz csatlakozó felhasználói fiók rendelkezik a megfelelő engedélyekkel. Az üzenetek küldéséhez és fogadásához szüksége van az Azure Event Hubs adattulajdonosi szerepkörére. A szerepkör hozzárendeléséhez szükség van a felhasználói hozzáférés-rendszergazdai szerepkörre, vagy egy másik szerepkörre, amely tartalmazza a Microsoft.Authorization/roleAssignments/write műveletet. Azure RBAC-szerepköröket rendelhet egy felhasználóhoz az Azure Portal, az Azure CLI vagy az Azure PowerShell használatával. További információ: Az Azure RBAC hatókörének ismertetése .

Az alábbi példa hozzárendeli a szerepkört a Azure Event Hubs Data Owner felhasználói fiókjához, amely teljes hozzáférést biztosít az Azure Event Hubs-erőforrásokhoz. Valós forgatókönyv esetén kövesse a Kisebb jogosultság elvét, hogy a felhasználók csak a biztonságosabb éles környezethez szükséges minimális engedélyeket kapják meg.

Azure beépített szerepkörök az Azure Event Hubshoz

Az Azure Event Hubs esetében a névterek és az összes kapcsolódó erőforrás kezelése az Azure Portalon és az Azure Resource Management API-val már védett az Azure RBAC-modellel. Az Azure az alábbi beépített szerepköröket biztosítja az Event Hubs-névtérhez való hozzáférés engedélyezéséhez:

  • Azure Event Hubs-adattulajdonos: Adathozzáférést tesz lehetővé az Event Hubs-névtérhez és annak entitásaihoz (üzenetsorokhoz, témakörökhöz, előfizetésekhez és szűrőkhöz).
  • Azure Event Hubs-adatküldő: Ezzel a szerepkörrel hozzáférést adhat a feladónak az Event Hubs-névtérhez és annak entitásaihoz.
  • Azure Event Hubs-adat fogadó: Ezzel a szerepkörrel hozzáférést adhat a fogadónak az Event Hubs-névtérhez és annak entitásaihoz.

Ha egyéni szerepkört szeretne létrehozni, tekintse meg az Event Hubs-műveletekhez szükséges jogosultságokat.

Fontos

A szerepkör-hozzárendelés propagálása a legtöbb esetben egy-két percet vesz igénybe az Azure-ban. Ritkán akár nyolc percig is eltarthat. Ha hitelesítési hibákat kap a kód első futtatásakor, várjon néhány percet, és próbálkozzon újra.

  1. Az Azure Portalon keresse meg az Event Hubs-névteret a fő keresősáv vagy a bal oldali navigációs sáv használatával.

  2. Az áttekintési lapon válassza a Hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.

  3. A Hozzáférés-vezérlés (IAM) lapon válassza a Szerepkör-hozzárendelések lapot.

  4. Válassza a +Hozzáadás lehetőséget a felső menüből. Ezután válassza a Szerepkör-hozzárendelés hozzáadása lehetőséget.

    A szerepkör hozzárendelését bemutató képernyőkép.

  5. A keresőmezővel szűrheti az eredményeket a kívánt szerepkörre. Ebben a példában keresse meg Azure Event Hubs Data Owner és válassza ki az egyező eredményt. Ezután válassza a Tovább gombot.

  6. A Hozzáférés hozzárendeléseterületen válassza Felhasználó, csoport vagy szolgáltatási főnévlehetőséget. Ezután válassza a + Tagok kijelölése lehetőséget.

  7. A párbeszédpanelen keresse meg a Microsoft Entra-felhasználónevet (általában a user@domain e-mail-címét). A párbeszédpanel alján válassza a Kiválasztás lehetőséget .

  8. Válassza a Véleményezés + hozzárendelés lehetőséget a végső lapra való ugráshoz. A folyamat befejezéséhez válassza a Véleményezés + hozzárendelés lehetőséget.

Események küldése

Ebben a szakaszban hozzon létre egy Python-szkriptet, amely eseményeket küld a korábban létrehozott eseményközpontba.

  1. Nyissa meg a kedvenc Python-szerkesztőt, például a Visual Studio Code-ot.

  2. Hozzon létre egy send.py nevű szkriptet. Ez a szkript egy eseményköteget küld a korábban létrehozott eseményközpontnak.

  3. Illessze be a következő kódot send.py-be:

    A kódban valós értékekkel cserélje le a következő helyőrzőket:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE – A teljes név a névtér Áttekintés lapján jelenik meg. A formátumnak a következőnek kell lennie: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Az eseményközpont neve.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Megjegyzés:

    Példák az események aszinkron módon, kapcsolati sztring használatával történő küldésére egy eseményközpontba, lásd a GitHub send_async.py oldalát. Az itt látható minták az események jelszó nélküli küldésére is alkalmazhatók.

Események fogadása

Ez a rövid útmutató az Azure Blob Storage-t használja ellenőrzőpont-tárolóként. Az ellenőrzőpont-tároló az ellenőrzőpontok (azaz az utolsó olvasási pozíciók) megőrzésére szolgál.

Kövesse ezeket a javaslatokat, amikor az Azure Blob Storage-t ellenőrzőpont-tárolóként használja:

  • Minden fogyasztói csoporthoz használjon külön tárolót. Használhatja ugyanazt a tárfiókot, de csoportonként egy tárolót.
  • Ne használja a tárfiókot semmi máshoz.
  • Ne használja a tárolót semmi máshoz.
  • Hozza létre a tárfiókot az üzembe helyezett alkalmazással azonos régióban. Ha az alkalmazás helyszíni, próbálja meg kiválasztani a lehető legközelebbi régiót.

Az Azure Portal Tárfiók lapján, a Blob szolgáltatás szakaszában győződjön meg arról, hogy a következő beállítások le vannak tiltva.

  • Hierarchikus névtér
  • Blob lágy törlése
  • Verziókezelés

Azure Storage-fiók és blobtároló létrehozása

Hozzon létre egy Azure Storage-fiókot és egy blobtárolót a következő lépések végrehajtásával:

  1. Azure Storage-fiók létrehozása
  2. Blobtároló létrehozása.
  3. Hitelesítés a blobtárolóhoz.

A fogadási kód későbbi használatához mindenképpen rögzítse a kapcsolati sztringet és a tároló nevét.

Helyi fejlesztéskor győződjön meg arról, hogy a blobadatokhoz hozzáférő felhasználói fiók rendelkezik a megfelelő engedélyekkel. A blobadatok olvasásához és írásához a Storage Blob Data Közreműködője szükséges. Ahhoz, hogy önmagának rendelje hozzá ezt a szerepkört, rendelkeznie kell a Felhasználói hozzáférés rendszergazdája szerepkörrel, vagy egy másik szerepkörrel, amely tartalmazza a Microsoft.Authorization/roleAssignments/write műveletet. Azure RBAC-szerepköröket rendelhet egy felhasználóhoz az Azure Portal, az Azure CLI vagy az Azure PowerShell használatával. További információ: Az Azure RBAC hatókörének ismertetése.

Ebben a forgatókönyvben engedélyeket rendel felhasználói fiókodhoz, amely a tárfiókra vonatkozóan terjed ki, követve a minimális jogosultság elvét. Ez a gyakorlat csak a minimálisan szükséges engedélyeket biztosítja a felhasználóknak, és biztonságosabbá teszi a termelési környezeteket.

Az alábbi példa a storage blobadatok közreműködői szerepkörét rendeli hozzá a felhasználói fiókhoz, amely olvasási és írási hozzáférést biztosít a tárfiók blobadataihoz.

Fontos

A szerepkör-hozzárendelés propagálása a legtöbb esetben egy-két percet vesz igénybe az Azure-ban. Ritkán akár nyolc percig is eltarthat. Ha hitelesítési hibákat kap a kód első futtatásakor, várjon néhány percet, és próbálkozzon újra.

  1. Az Azure Portalon keresse meg a tárfiókot a fő keresősávon vagy a bal oldali navigációs sávon.

  2. A tárfiók oldalán válassza a Hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.

  3. A Hozzáférés-vezérlés (IAM) lapon válassza a Szerepkör-hozzárendelések lapot.

  4. Válassza a +Hozzáadás lehetőséget a felső menüből. Ezután válassza a Szerepkör-hozzárendelés hozzáadása lehetőséget.

    A tárfiók-szerepkör hozzárendelését bemutató képernyőkép.

  5. A keresőmezővel szűrheti az eredményeket a kívánt szerepkörre. Ebben a példában keresse meg a Storage Blob Data Contributor. Jelölje ki az egyező eredményt, majd válassza a Következőlehetőséget.

  6. A Hozzáférés hozzárendelése területen válassza a Felhasználó, csoport vagy szolgáltatásnév lehetőséget, majd válassza a + Tagok kijelölése lehetőséget.

  7. A párbeszédpanelen keresse meg a Microsoft Entra-felhasználónevet (általában a user@domain e-mail-címét), majd válassza a Párbeszédpanel alján található Kiválasztás lehetőséget.

  8. Válassza a Véleményezés + hozzárendelés lehetőséget a végső lapra való ugráshoz. A folyamat befejezéséhez válassza a Véleményezés + hozzárendelés lehetőséget.

A csomagok telepítése események fogadásához

A fogadó oldalon telepítenie kell egy vagy több csomagot. Ebben a rövid útmutatóban az Azure Blob Storage használatával őrizheti meg az ellenőrzőpontokat, hogy a program ne olvassa el a már beolvasott eseményeket. Metaadat-ellenőrző pontokat végez a fogadott üzeneteken egy blobban rendszeres időközönként. Ez a megközelítés megkönnyíti az üzenetek fogadását később, ahonnan abbahagyta.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Python-szkript létrehozása események fogadásához

Ebben a szakaszban egy Python-szkriptet hoz létre, amely eseményeket fogad az eseményközpontból:

  1. Nyissa meg a kedvenc Python-szerkesztőt, például a Visual Studio Code-ot.

  2. Hozzon létre egy recv.py nevű szkriptet.

  3. Illessze be a következő kódot a recv.py fájlba:

    A kódban valós értékekkel cserélje le a következő helyőrzőket:

    • BLOB_STORAGE_ACCOUNT_URL - Ennek az értéknek a következő formátumban kell lennie: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - Az Azure Storage-fiók blobtárolójának neve.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE – A teljes név a névtér Áttekintés lapján jelenik meg. A formátumnak a következőnek kell lennie: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Az eseményközpont neve.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Megjegyzés:

    Az események eseményközpontból aszinkron módon, kapcsolati sztring használatával történő fogadására vonatkozó egyéb lehetőségekért tekintse meg a GitHub recv_with_checkpoint_store_async.py oldalát. Az itt látható minták az események jelszó nélküli fogadására is alkalmazhatók.

A fogadóalkalmazás futtatása

  1. Nyisson meg egy parancssoros ablakot.

  2. Futtassa a következő parancsot, és jelentkezzen be az Azure Event Hubs-adattulajdonosi szerepkörhöz az Event Hubs-névtérben és a Storage Blob Data Közreműködő szerepkörben az Azure Storage-fiókban hozzáadott fiókkal.

    az login
    
  3. Váltson a receive.py fájlt tartalmazó mappára, és futtassa a következő parancsot:

    python recv.py
    

A küldő alkalmazás futtatása

  1. Nyisson meg egy parancssoros ablakot.

  2. Futtassa a következő parancsot, és jelentkezzen be az Azure Event Hubs-adattulajdonosi szerepkörhöz az Event Hubs-névtérben és a Storage Blob Data Közreműködő szerepkörben az Azure Storage-fiókban hozzáadott fiókkal.

    az login
    
  3. Váltson a send.py tartalmazó mappára, majd futtassa a következő parancsot:

    python send.py
    

A fogadóablakban meg kell jeleníteni az eseményközpontnak küldött üzeneteket.

Hibaelhárítás

Ha nem látja az eseményeket a fogadóablakban, vagy a kód hibát jelez, próbálkozzon az alábbi hibaelhárítási tippekkel:

  • Ha nem látja a recy.py eredményeit, futtassa többször send.py .

  • Ha a jelszó nélküli kód használatakor "coroutine" hibába ütközik, győződjön meg arról, hogy az importálást a azure.identity.aio-ből végzi.

  • Ha a "Befejezetlen ügyfél-munkamenet" jelenik meg jelszó nélküli kóddal (hitelesítő adatokkal), ügyeljen arra, hogy a hitelesítő adatokat zárja be, amikor befejezte. További információ: Async hitelesítő adatok.

  • Ha jogosultsági hibákat tapasztal a recv.py fájl a tárterület elérésekor, győződjön meg arról, hogy követte az Azure Storage-fiók és egy blobtároló létrehozásának lépéseit, és hozzárendelte a Storage Blob Data Contributor szerepkört a szolgáltatáskerethez.

  • Ha különböző partícióazonosítókkal rendelkező eseményeket kap, ez az eredmény várható. A partíciók egy adatszervezési mechanizmus, amely az alkalmazások használatához szükséges alsóbb rétegbeli párhuzamossághoz kapcsolódik. Az egyes eseményközpontokban található partíciók számának kiválasztása közvetlenül kapcsolódik az egyidejű olvasók várt számához. További információ: További információ a partíciókról.

Következő lépések

Gyors kezdés során aszinkron módon küld és fogad eseményeket. Ha szeretné megtudni, hogyan küldhet és fogadhat eseményeket szinkron módon, lépjen a GitHub sync_samples lapra.

Fedezze fel a további példákat és haladó forgatókönyveket az Azure Event Hubs ügyfélkönyvtár Python-mintáira.