Gebeurtenissen verzenden of gebeurtenissen ontvangen uit Event Hubs met behulp van Python

In deze quickstart ziet u hoe u gebeurtenissen kunt verzenden naar en ontvangen van een Event Hub met behulp van het Python-pakket azure-eventhub.

Vereisten

Als u nog geen ervaring hebt met Azure Event Hubs, raadpleegt u het Event Hubs-overzicht voordat u deze quickstart uitvoert.

Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:

  • Microsoft Azure-abonnement. Als u Azure-services wilt gebruiken, met inbegrip van Azure Event Hubs, hebt u een abonnement nodig. Als u geen bestaand Azure-account hebt, meldt u zich aan voor een gratis proefversie.
  • Python 3.8 of hoger, waarbij PIP is geïnstalleerd en bijgewerkt.
  • Visual Studio Code (aanbevolen) of een andere IDE (Integrated Development Environment).
  • Een Event Hubs-naamruimte en een Event Hub maken. De eerste stap is het gebruik van Azure Portal om een Event Hubs-naamruimte te maken en de beheerreferenties te verkrijgen die uw toepassing nodig heeft om te communiceren met de Event Hub. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken.

De pakketten installeren om gebeurtenissen te verzenden

Als u de Python-pakketten voor Event Hubs wilt installeren, opent u een opdrachtprompt met Python in het pad. Wijzig de map in de map waarin u de voorbeelden wilt bewaren.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

De app verifiëren bij Azure

In deze quickstart ziet u twee manieren om verbinding te maken met Azure Event Hubs: zonder wachtwoord en verbindingsreeks. De eerste optie laat zien hoe u uw beveiligingsprincipaal gebruikt in Microsoft Entra ID en op rollen gebaseerd toegangsbeheer (RBAC) om verbinding te maken met een Event Hubs-naamruimte. U hoeft zich geen zorgen te maken over in code vastgelegde verbindingsreeks s in uw code of in een configuratiebestand of in een beveiligde opslag, zoals Azure Key Vault. De tweede optie laat zien hoe u een verbindingsreeks gebruikt om verbinding te maken met een Event Hubs-naamruimte. Als u nog geen gebruik hebt gemaakt van Azure, kunt u de verbindingsreeks optie gemakkelijker volgen. We raden u aan de optie zonder wachtwoord te gebruiken in echte toepassingen en productieomgevingen. Zie Verificatie en autorisatie voor meer informatie. U kunt ook meer lezen over verificatie zonder wachtwoord op de overzichtspagina.

Rollen toewijzen aan uw Microsoft Entra-gebruiker

Zorg er bij het lokaal ontwikkelen voor dat het gebruikersaccount dat verbinding maakt met Azure Event Hubs over de juiste machtigingen beschikt. U hebt de rol Azure Event Hubs-gegevenseigenaar nodig om berichten te kunnen verzenden en ontvangen. Als u uzelf deze rol wilt toewijzen, hebt u de rol Gebruikerstoegang Beheer istrator of een andere rol nodig die de Microsoft.Authorization/roleAssignments/write actie bevat. U kunt Azure RBAC-rollen toewijzen aan een gebruiker met behulp van Azure Portal, Azure CLI of Azure PowerShell. Meer informatie over de beschikbare bereiken voor roltoewijzingen op de overzichtspagina van het bereik.

In het volgende voorbeeld wordt de Azure Event Hubs Data Owner rol toegewezen aan uw gebruikersaccount, dat volledige toegang biedt tot Azure Event Hubs-resources. Volg in een echt scenario het principe van minimale bevoegdheden om gebruikers alleen de minimale machtigingen te geven die nodig zijn voor een veiligere productieomgeving.

Ingebouwde Azure-rollen voor Azure Event Hubs

Voor Azure Event Hubs is het beheer van naamruimten en alle gerelateerde resources via Azure Portal en de Azure Resource Management-API al beveiligd met behulp van het Azure RBAC-model. Azure biedt de onderstaande ingebouwde Azure-rollen voor het autoriseren van toegang tot een Event Hubs-naamruimte:

Als u een aangepaste rol wilt maken, raadpleegt u Rechten die vereist zijn voor Event Hubs-bewerkingen.

Belangrijk

In de meeste gevallen duurt het een paar minuten voordat de roltoewijzing is doorgegeven in Azure. In zeldzame gevallen kan het maximaal acht minuten duren. Als u verificatiefouten ontvangt wanneer u de code voor het eerst uitvoert, wacht u even en probeert u het opnieuw.

  1. Zoek in Azure Portal uw Event Hubs-naamruimte met behulp van de hoofdzoekbalk of linkernavigatiebalk.

  2. Selecteer op de overzichtspagina toegangsbeheer (IAM) in het linkermenu.

  3. Selecteer op de pagina Toegangsbeheer (IAM) het tabblad Roltoewijzingen .

  4. Selecteer + Toevoegen in het bovenste menu en voeg vervolgens roltoewijzing toe in de resulterende vervolgkeuzelijst.

    A screenshot showing how to assign a role.

  5. Gebruik het zoekvak om de resultaten te filteren op de gewenste rol. In dit voorbeeld zoekt Azure Event Hubs Data Owner en selecteert u het overeenkomende resultaat. Kies vervolgens Volgende.

  6. Selecteer onder Toegang toewijzen de optie Gebruiker, groep of service-principal en kies vervolgens + Leden selecteren.

  7. Zoek in het dialoogvenster naar uw Microsoft Entra-gebruikersnaam (meestal uw user@domain e-mailadres) en kies Vervolgens onderaan het dialoogvenster Selecteren .

  8. Selecteer Beoordelen + toewijzen om naar de laatste pagina te gaan en vervolgens opnieuw beoordelen en toewijzen om het proces te voltooien.

Gebeurtenissen verzenden

In deze sectie maakt u een Python-script om gebeurtenissen te verzenden naar de Event Hub die u eerder hebt gemaakt.

  1. Open uw favoriete Python-editor, bijvoorbeeld Visual Studio Code.

  2. Maak een script met de naam send.py. Met dit script wordt een batch van gebeurtenissen verzonden naar de Event Hub die u eerder hebt gemaakt.

  3. Plak de volgende code in send.py:

    Gebruik in de code echte waarden om de volgende tijdelijke aanduidingen te vervangen:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Notitie

    Zie de GitHub-pagina send_async.py voor voorbeelden van andere opties voor het asynchroon verzenden van gebeurtenissen naar Event Hub met behulp van een verbindingsreeks. De weergegeven patronen zijn ook van toepassing op het verzenden van gebeurtenissen zonder wachtwoord.

Gebeurtenissen ontvangen

In deze quickstart wordt Azure Blob Storage gebruikt als controlepuntopslag. De controlepuntopslag wordt gebruikt om controlepunten te behouden (dat wil zeggen, de laatste leesposities).

Volg deze aanbevelingen wanneer u Azure Blob Storage gebruikt als controlepuntarchief:

  • Gebruik een afzonderlijke container voor elke consumentengroep. U kunt hetzelfde opslagaccount gebruiken, maar één container per groep gebruiken.
  • Gebruik de container niet voor iets anders en gebruik het opslagaccount niet voor iets anders.
  • Het opslagaccount moet zich in dezelfde regio bevinden als waarin de geïmplementeerde toepassing zich bevindt. Als de toepassing on-premises is, probeert u de dichtstbijzijnde regio te kiezen.

Controleer op de pagina Opslagaccount in Azure Portal in de sectie Blob-service of de volgende instellingen zijn uitgeschakeld.

  • Hiërarchische naamruimte
  • Blob voorlopig verwijderen
  • Versiebeheer

Een Azure-opslagaccount en een blobcontainer maken

Maak een Azure-opslagaccount met daarin een blobcontainer door de volgende stappen te volgen:

  1. Een Azure Storage-account maken
  2. Maak een blobcontainer.
  3. Verifiëren bij de blobcontainer.

Zorg ervoor dat u de verbindingsreeks en de containernaam vastlegt voor later gebruik in de receive-code.

Zorg er bij het lokaal ontwikkelen voor dat het gebruikersaccount dat toegang heeft tot blobgegevens over de juiste machtigingen beschikt. U hebt Inzender voor Opslagblobgegevens nodig om blobgegevens te lezen en schrijven. Als u uzelf deze rol wilt toewijzen, moet u de rol Gebruikerstoegang Beheer istrator of een andere rol met de actie Microsoft.Authorization/roleAssignments/write toegewezen krijgen. U kunt Azure RBAC-rollen toewijzen aan een gebruiker met behulp van Azure Portal, Azure CLI of Azure PowerShell. Meer informatie over de beschikbare bereiken voor roltoewijzingen vindt u op de overzichtspagina van het bereik.

In dit scenario wijst u machtigingen toe aan uw gebruikersaccount, dat is afgestemd op het opslagaccount, om het principe van minimale bevoegdheden te volgen. Deze procedure biedt gebruikers alleen de minimale machtigingen die nodig zijn en maakt veiligere productieomgevingen.

In het volgende voorbeeld wordt de rol Inzender voor opslagblobgegevens toegewezen aan uw gebruikersaccount, dat zowel lees- als schrijftoegang biedt tot blobgegevens in uw opslagaccount.

Belangrijk

In de meeste gevallen duurt het een of twee minuten voordat de roltoewijzing is doorgegeven in Azure, maar in zeldzame gevallen kan het maximaal acht minuten duren. Als u verificatiefouten ontvangt wanneer u de code voor het eerst uitvoert, wacht u even en probeert u het opnieuw.

  1. Zoek uw opslagaccount in Azure Portal met behulp van de hoofdzoekbalk of linkernavigatiebalk.

  2. Selecteer op de overzichtspagina van het opslagaccount toegangsbeheer (IAM) in het menu aan de linkerkant.

  3. Selecteer op de pagina Toegangsbeheer (IAM) het tabblad Roltoewijzingen .

  4. Selecteer + Toevoegen in het bovenste menu en voeg vervolgens roltoewijzing toe in de resulterende vervolgkeuzelijst.

    A screenshot showing how to assign a storage account role.

  5. Gebruik het zoekvak om de resultaten te filteren op de gewenste rol. Zoek in dit voorbeeld naar Inzender voor Opslagblobgegevens en selecteer het overeenkomende resultaat en kies vervolgens Volgende.

  6. Selecteer onder Toegang toewijzen de optie Gebruiker, groep of service-principal en kies vervolgens + Leden selecteren.

  7. Zoek in het dialoogvenster naar uw Microsoft Entra-gebruikersnaam (meestal uw user@domain e-mailadres) en kies Vervolgens onderaan het dialoogvenster Selecteren .

  8. Selecteer Beoordelen + toewijzen om naar de laatste pagina te gaan en vervolgens opnieuw beoordelen en toewijzen om het proces te voltooien.

De pakketten installeren om gebeurtenissen te ontvangen

Voor de ontvangende zijde moet u een of meer pakketten installeren. In deze quickstart gebruikt u Azure Blob-opslag om controlepunten te behouden, zodat met het programma geen reeds gelezen gebeurtenissen worden gelezen. Er worden in een blob regelmatig controlepunten voor metagegevens uitgevoerd voor ontvangen berichten. Met deze aanpak kunt u later eenvoudig berichten blijven ontvangen vanaf de plek waar u was gebleven.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Een Python-script maken om gebeurtenissen te ontvangen

In deze sectie maakt u een Python-script om gebeurtenissen van uw Event Hub te ontvangen:

  1. Open uw favoriete Python-editor, bijvoorbeeld Visual Studio Code.

  2. Maak een script met de naam recv.py.

  3. Plak de volgende code in recv.py:

    Gebruik in de code echte waarden om de volgende tijdelijke aanduidingen te vervangen:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Notitie

    Zie de pagina GitHub recv_with_checkpoint_store_async.py voor voorbeelden van andere opties voor het asynchroon ontvangen van gebeurtenissen van Event Hub met behulp van een verbindingsreeks. De weergegeven patronen zijn ook van toepassing op het ontvangen van gebeurtenissen zonder wachtwoord.

De ontvangende app uitvoeren

U voert het script uit door een opdrachtprompt te openen met Python in het pad en deze opdracht uit te voeren:

python recv.py

De verzendende app uitvoeren

U voert het script uit door een opdrachtprompt te openen met Python in het pad en deze opdracht uit te voeren:

python send.py

In het ontvangstvenster worden de berichten weergegeven die naar de Event Hub zijn verzonden.

Probleemoplossing

Als er geen gebeurtenissen worden weergegeven in het ontvangervenster of als de code een fout rapporteert, kunt u de volgende tips voor probleemoplossing proberen:

  • Als u geen resultaten van recy.py ziet, voert u send.py meerdere keren uit.

  • Als er fouten over 'coroutine' worden weergegeven bij het gebruik van de code zonder wachtwoord (met referenties), controleert u of u importeert uit azure.identity.aio.

  • Als u 'Niet-afgesloten clientsessie' ziet met code zonder wachtwoord (met referenties), moet u de referentie sluiten wanneer u klaar bent. Zie Async-referenties voor meer informatie.

  • Als u autorisatiefouten met recv.py ziet bij het openen van opslag, controleert u of u de stappen in Een Azure-opslagaccount en een blobcontainer hebt gevolgd en de rol Inzender voor opslagblobgegevens hebt toegewezen aan de service-principal.

  • Als u gebeurtenissen met verschillende partitie-id's ontvangt, wordt dit resultaat verwacht. Partities zijn een mechanisme voor gegevensordening. Ze hebben betrekking op de mate van downstreamparallelheid die is vereist bij het gebruik van toepassingen. Het aantal partities in een Event Hub houdt rechtstreeks verband met het aantal verwachte gelijktijdige lezers. Zie Meer informatie over partities voor meer informatie.

Volgende stappen

In deze quickstart hebt u gebeurtenissen asynchroon verzonden en ontvangen. Ga naar de pagina GitHub sync_samples om te leren hoe u gebeurtenissen synchroon verzendt en ontvangt.

Ga naar de Azure Event Hubs-clientbibliotheek voor Python-voorbeelden voor alle voorbeelden (zowel synchrone als asynchrone) in GitHub.