Delen via


Azure EventHubs Checkpoint Store-clientbibliotheek voor Python - versie 1.1.4

storage-blobs gebruiken

Azure EventHubs Checkpoint Store wordt gebruikt voor het opslaan van controlepunten tijdens het verwerken van gebeurtenissen van Azure Event Hubs. Dit Checkpoint Store-pakket werkt als een invoegtoepassingspakket voor EventHubConsumerClient. Azure Storage Blob wordt gebruikt als permanente opslag voor het onderhouden van controlepunten en informatie over het eigendom van partities.

Houd er rekening mee dat dit een asynchrone bibliotheek is. Raadpleeg azure-eventhub-checkpointstoreblob voor de synchronisatieversie van de Azure EventHubs Checkpoint Store-clientbibliotheek.

Broncode | Pakket (PyPi) | API-referentiedocumentatie | Documentatie | voor Azure EventhubsDocumentatie voor Azure Storage

Aan de slag

Vereisten

  • Python 3.6 of hoger.

  • Microsoft Azure-abonnement: Als u Azure-services wilt gebruiken, waaronder Azure Event Hubs, hebt u een abonnement nodig. Als u geen bestaand Azure-account hebt, kunt u zich registreren voor een gratis proefversie of de voordelen van uw MSDN-abonnee gebruiken wanneer u een account maakt.

  • Event Hubs-naamruimte met een Event Hub: Als u wilt communiceren met Azure Event Hubs, moet u ook een naamruimte en Event Hub beschikbaar hebben. Als u niet bekend bent met het maken van Azure-resources, kunt u de stapsgewijze handleiding volgen voor het maken van een Event Hub met behulp van de Azure Portal. Hier vindt u ook gedetailleerde instructies voor het gebruik van de Azure CLI-, Azure PowerShell- of Arm-sjablonen (Azure Resource Manager) om een Event Hub te maken.

  • Azure Storage-account: U moet een Azure Storage-account hebben en een Azure Blob Storage blokcontainer maken om de controlepuntgegevens met blobs op te slaan. U kunt de handleiding voor het maken van een Azure Block Blob Storage-account volgen.

Het pakket installeren

$ pip install azure-eventhub-checkpointstoreblob-aio

Belangrijkste concepten

Controlepunten maken

Het plaatsen van controlepunten is een proces waarbij lezers hun positie binnen de gebeurtenisvolgorde van een partitie markeren of vastleggen. Het plaatsen van controlepunten is de verantwoordelijkheid van de consumer en vindt plaats per partitie binnen een consumergroep. Deze verantwoordelijkheid houdt in dat elke partitielezer voor elke consumergroep de huidige positie in de gebeurtenisstroom moet bijhouden en de service kan informeren wanneer de gegevensstroom is voltooid. Als een lezer van een partitie is losgekoppeld en er vervolgens weer verbinding wordt gemaakt, begint het lezen bij het controlepunt dat eerder is verzonden door de laatste lezer van de betreffende partitie in de consumergroep. Wanneer de lezer verbinding maakt, wordt de offset doorgegeven aan de Event Hub om de locatie op te geven waar moet worden gelezen. Op deze manier kunt u het plaatsen van controlepunten gebruiken om gebeurtenissen te markeren als 'voltooid' door downstream-toepassingen. Bovendien beschikt u met controlepunten over tolerantie bij een failover tussen lezers die op verschillende apparaten worden uitgevoerd. Het is mogelijk om terug te keren naar de oudere gegevens door een lagere offset van dit controlepuntproces op te geven. Via dit mechanisme zorgt het plaatsen van controlepunten voor failover-tolerantie en voor herhaling van gebeurtenisstromen.

Reeksnummers van verschuivingen &

Beide offsetvolgordenummers & verwijzen naar de positie van een gebeurtenis binnen een partitie. U kunt ze zien als een cursor aan de clientzijde. De offset is een bytenummering van de gebeurtenis. Met het offset-/volgnummer kan een gebeurtenisconsumer (lezer) een punt in de gebeurtenisstroom opgeven van waaruit ze gebeurtenissen willen lezen. U kunt een tijdstempel opgeven, zodat u gebeurtenissen alleen ontvangt na het opgegeven tijdstempel. Consumers zijn zelf verantwoordelijk voor het opslaan van hun eigen offsetwaarden buiten de Event Hubs-service. Binnen een partitie bevat elke gebeurtenis een offset, een volgnummer en het tijdstempel van wanneer deze werd onderbroken.

Voorbeelden

Een maken EventHubConsumerClient

De eenvoudigste manier om een EventHubConsumerClient te maken is door een connection string te gebruiken.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Voor andere manieren om een EventHubConsumerClientte maken, raadpleegt u de EventHubs-bibliotheek voor meer informatie.

Gebeurtenissen gebruiken met behulp van een BlobCheckpointStore to-do-controlepunt

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Gebruiken BlobCheckpointStore met een andere versie van De Azure Storage Service-API

Sommige omgevingen hebben verschillende versies van de Azure Storage Service-API. BlobCheckpointStore maakt standaard gebruik van de Storage Service API versie 2019-07-07. Als u deze wilt gebruiken voor een andere versie, geeft api_version u op wanneer u het BlobCheckpointStore object maakt.

Problemen oplossen

Algemeen

Het inschakelen van logboekregistratie is handig voor het oplossen van problemen.

Logboekregistratie

  • Logboekregistratie inschakelen azure.eventhub.extensions.checkpointstoreblobaio om traceringen uit de bibliotheek te verzamelen.
  • Schakel azure.eventhub logboekregistratie in om traceringen te verzamelen uit de hoofdbibliotheek van Azure-EventHub.
  • Logboekregistratie inschakelen azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage voor het verzamelen van traceringen uit de Azure Storage Blob-bibliotheek.
  • Schakel uamqp logboekregistratie in om traceringen van de onderliggende uAMQP-bibliotheek te verzamelen.
  • Schakel tracering op AMQP-frameniveau in door in te stellen logging_enable=True bij het maken van de client.

Volgende stappen

Meer voorbeeldcode

Ga aan de slag met onze asynchrone voorbeelden van EventHubs Checkpoint Store.

Documentatie

Referentiedocumentatie is hier beschikbaar.

Feedback geven

Als u fouten tegenkomt of suggesties hebt, kunt u een probleem melden in de sectie Problemen van het project.

Bijdragen

Wij verwelkomen bijdragen en suggesties voor dit project. Voor de meeste bijdragen moet u instemmen met een licentieovereenkomst voor bijdragers (CLA: Contributor License Agreement) waarin u verklaart dat u gerechtigd bent ons het recht te geven uw bijdrage te gebruiken, en dat u dit ook doet. Ga naar https://cla.microsoft.com voor meer informatie.

Wanneer u een pull-aanvraag indient, wordt met een CLA-bot automatisch bepaald of u een CLA moet verschaffen en wordt de pull-aanvraag dienovereenkomstig opgemaakt (bijvoorbeeld met een label of commentaar). Volg gewoon de instructies van de bot. U hoeft dit maar eenmaal te doen voor alle repo's waar gebruik wordt gemaakt van onze CLA.

Op dit project is de Microsoft Open Source Code of Conduct (Microsoft Open Source-gedragscode) van toepassing. Zie de Veelgestelde vragen over de gedragscode voor meer informatie of neem contact op opencode@microsoft.com met eventuele aanvullende vragen of opmerkingen.

Weergaven