Azure EventHubs Checkpoint Store-clientbibliotheek voor Python - versie 1.1.4
storage-blobs gebruiken
Azure EventHubs Checkpoint Store wordt gebruikt voor het opslaan van controlepunten tijdens het verwerken van gebeurtenissen van Azure Event Hubs.
Dit Checkpoint Store-pakket werkt als een invoegtoepassingspakket voor EventHubConsumerClient
. Azure Storage Blob wordt gebruikt als permanente opslag voor het onderhouden van controlepunten en informatie over het eigendom van partities.
Houd er rekening mee dat dit een asynchrone bibliotheek is. Raadpleeg azure-eventhub-checkpointstoreblob voor de synchronisatieversie van de Azure EventHubs Checkpoint Store-clientbibliotheek.
Broncode | Pakket (PyPi) | API-referentiedocumentatie | Documentatie | voor Azure EventhubsDocumentatie voor Azure Storage
Aan de slag
Vereisten
Python 3.6 of hoger.
Microsoft Azure-abonnement: Als u Azure-services wilt gebruiken, waaronder Azure Event Hubs, hebt u een abonnement nodig. Als u geen bestaand Azure-account hebt, kunt u zich registreren voor een gratis proefversie of de voordelen van uw MSDN-abonnee gebruiken wanneer u een account maakt.
Event Hubs-naamruimte met een Event Hub: Als u wilt communiceren met Azure Event Hubs, moet u ook een naamruimte en Event Hub beschikbaar hebben. Als u niet bekend bent met het maken van Azure-resources, kunt u de stapsgewijze handleiding volgen voor het maken van een Event Hub met behulp van de Azure Portal. Hier vindt u ook gedetailleerde instructies voor het gebruik van de Azure CLI-, Azure PowerShell- of Arm-sjablonen (Azure Resource Manager) om een Event Hub te maken.
Azure Storage-account: U moet een Azure Storage-account hebben en een Azure Blob Storage blokcontainer maken om de controlepuntgegevens met blobs op te slaan. U kunt de handleiding voor het maken van een Azure Block Blob Storage-account volgen.
Het pakket installeren
$ pip install azure-eventhub-checkpointstoreblob-aio
Belangrijkste concepten
Controlepunten maken
Het plaatsen van controlepunten is een proces waarbij lezers hun positie binnen de gebeurtenisvolgorde van een partitie markeren of vastleggen. Het plaatsen van controlepunten is de verantwoordelijkheid van de consumer en vindt plaats per partitie binnen een consumergroep. Deze verantwoordelijkheid houdt in dat elke partitielezer voor elke consumergroep de huidige positie in de gebeurtenisstroom moet bijhouden en de service kan informeren wanneer de gegevensstroom is voltooid. Als een lezer van een partitie is losgekoppeld en er vervolgens weer verbinding wordt gemaakt, begint het lezen bij het controlepunt dat eerder is verzonden door de laatste lezer van de betreffende partitie in de consumergroep. Wanneer de lezer verbinding maakt, wordt de offset doorgegeven aan de Event Hub om de locatie op te geven waar moet worden gelezen. Op deze manier kunt u het plaatsen van controlepunten gebruiken om gebeurtenissen te markeren als 'voltooid' door downstream-toepassingen. Bovendien beschikt u met controlepunten over tolerantie bij een failover tussen lezers die op verschillende apparaten worden uitgevoerd. Het is mogelijk om terug te keren naar de oudere gegevens door een lagere offset van dit controlepuntproces op te geven. Via dit mechanisme zorgt het plaatsen van controlepunten voor failover-tolerantie en voor herhaling van gebeurtenisstromen.
Reeksnummers van verschuivingen &
Beide offsetvolgordenummers & verwijzen naar de positie van een gebeurtenis binnen een partitie. U kunt ze zien als een cursor aan de clientzijde. De offset is een bytenummering van de gebeurtenis. Met het offset-/volgnummer kan een gebeurtenisconsumer (lezer) een punt in de gebeurtenisstroom opgeven van waaruit ze gebeurtenissen willen lezen. U kunt een tijdstempel opgeven, zodat u gebeurtenissen alleen ontvangt na het opgegeven tijdstempel. Consumers zijn zelf verantwoordelijk voor het opslaan van hun eigen offsetwaarden buiten de Event Hubs-service. Binnen een partitie bevat elke gebeurtenis een offset, een volgnummer en het tijdstempel van wanneer deze werd onderbroken.
Voorbeelden
- Een Azure EventHubs maken
EventHubConsumerClient
- Gebeurtenissen gebruiken met behulp van een
BlobCheckpointStore
Een maken EventHubConsumerClient
De eenvoudigste manier om een EventHubConsumerClient
te maken is door een connection string te gebruiken.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Voor andere manieren om een EventHubConsumerClient
te maken, raadpleegt u de EventHubs-bibliotheek voor meer informatie.
Gebeurtenissen gebruiken met behulp van een BlobCheckpointStore
to-do-controlepunt
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Gebruiken BlobCheckpointStore
met een andere versie van De Azure Storage Service-API
Sommige omgevingen hebben verschillende versies van de Azure Storage Service-API.
BlobCheckpointStore
maakt standaard gebruik van de Storage Service API versie 2019-07-07. Als u deze wilt gebruiken voor een andere versie, geeft api_version
u op wanneer u het BlobCheckpointStore
object maakt.
Problemen oplossen
Algemeen
Het inschakelen van logboekregistratie is handig voor het oplossen van problemen.
Logboekregistratie
- Logboekregistratie inschakelen
azure.eventhub.extensions.checkpointstoreblobaio
om traceringen uit de bibliotheek te verzamelen. - Schakel
azure.eventhub
logboekregistratie in om traceringen te verzamelen uit de hoofdbibliotheek van Azure-EventHub. - Logboekregistratie inschakelen
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
voor het verzamelen van traceringen uit de Azure Storage Blob-bibliotheek. - Schakel
uamqp
logboekregistratie in om traceringen van de onderliggende uAMQP-bibliotheek te verzamelen. - Schakel tracering op AMQP-frameniveau in door in te stellen
logging_enable=True
bij het maken van de client.
Volgende stappen
Meer voorbeeldcode
Ga aan de slag met onze asynchrone voorbeelden van EventHubs Checkpoint Store.
- receive_events_using_checkpoint_store_async.py - Voorbeeld van EventHubConsumerClient met blobcontrolepuntopslag
- receive_events_using_checkpoint_store_storage_api_version_async.py - Voorbeeld van EventHubConsumerClient met blobcontrolepuntopslag en opslagversie
Documentatie
Referentiedocumentatie is hier beschikbaar.
Feedback geven
Als u fouten tegenkomt of suggesties hebt, kunt u een probleem melden in de sectie Problemen van het project.
Bijdragen
Wij verwelkomen bijdragen en suggesties voor dit project. Voor de meeste bijdragen moet u instemmen met een licentieovereenkomst voor bijdragers (CLA: Contributor License Agreement) waarin u verklaart dat u gerechtigd bent ons het recht te geven uw bijdrage te gebruiken, en dat u dit ook doet. Ga naar https://cla.microsoft.com voor meer informatie.
Wanneer u een pull-aanvraag indient, wordt met een CLA-bot automatisch bepaald of u een CLA moet verschaffen en wordt de pull-aanvraag dienovereenkomstig opgemaakt (bijvoorbeeld met een label of commentaar). Volg gewoon de instructies van de bot. U hoeft dit maar eenmaal te doen voor alle repo's waar gebruik wordt gemaakt van onze CLA.
Op dit project is de Microsoft Open Source Code of Conduct (Microsoft Open Source-gedragscode) van toepassing. Zie de Veelgestelde vragen over de gedragscode voor meer informatie of neem contact op opencode@microsoft.com met eventuele aanvullende vragen of opmerkingen.
Azure SDK for Python