Share via


Clientbibliothek des Azure EventHubs-Prüfpunktspeichers für Python – Version 1.1.4

Verwenden von Speicherblobs

Azure EventHubs Checkpoint Store wird zum Speichern von Prüfpunkten bei der Verarbeitung von Ereignissen aus Azure Event Hubs verwendet. Dieses Checkpoint Store-Paket funktioniert als Plug-In-Paket für EventHubConsumerClient. Es verwendet Azure Storage Blob als beständigen Speicher zum Verwalten von Prüfpunkten und Partitionsbesitzinformationen.

Beachten Sie, dass es sich um eine Synchronisierungsbibliothek handelt. Informationen zur asynchronen Version der Azure EventHubs Checkpoint Store-Clientbibliothek finden Sie unter azure-eventhub-checkpointstoreblob-aio.

Quellcode | Paket (PyPi) | API-Referenzdokumentation | Azure Eventhubs-Dokumentation | Azure Storage-Dokumentation

Erste Schritte

Voraussetzungen

  • Python2.7, Python 3.6 oder höher.

  • Microsoft Azure-Abonnement: Für die Verwendung von Azure-Diensten, einschließlich Azure Event Hubs, benötigen Sie ein Abonnement. Wenn Sie nicht über ein vorhandenes Azure-Konto verfügen, können Sie sich für eine kostenlose Testversion registrieren oder ihre MSDN-Abonnentenvorteile beim Erstellen eines Kontos nutzen.

  • Event Hubs-Namespace mit einem Event Hub: Um mit Azure Event Hubs zu interagieren, müssen Sie auch einen Namespace und Event Hub zur Verfügung haben. Wenn Sie mit dem Erstellen von Azure-Ressourcen nicht vertraut sind, sollten Sie die schrittweise Anleitung zum Erstellen eines Event Hubs mit dem Azure-Portal befolgen. Dort finden Sie auch ausführliche Anweisungen zum Verwenden von Azure CLI-, Azure PowerShell- oder Azure Resource Manager-Vorlagen (ARM) zum Erstellen eines Event Hubs.

  • Azure Storage-Konto: Sie müssen über ein Azure Storage-Konto verfügen und einen Azure Blob Storage Blockcontainer erstellen, um die Prüfpunktdaten mit Blobs zu speichern. Sie können die Anleitung zum Erstellen eines Azure Block Blob Storage-Kontos befolgen.

Installieren des Pakets

$ pip install azure-eventhub-checkpointstoreblob

Wichtige Begriffe

Setzen von Prüfpunkten

Setzen von Prüfpunkten ist ein Vorgang, bei dem Leser ihre Position innerhalb einer Partitionsereignissequenz markieren oder bestätigen. Dies liegt in der Verantwortung des Consumers und erfolgt auf Partitionsbasis innerhalb einer Consumergruppe. Das bedeutet, dass jeder Partitionsleser für jede Consumergruppe seine aktuelle Position im Ereignisstream nachverfolgen muss und den Dienst informieren kann, wenn er den Datenstrom als abgeschlossen betrachtet. Wenn ein Leser die Verbindung zu eine Partition trennt, beginnt nach dem erneuten Herstellen der Verbindung das Lesen bei dem Prüfpunkt, der zuvor durch den letzten Leser dieser Partition in dieser Consumergruppe übermittelt wurde. Wenn der Leser verbunden ist, übergibt er diesen Offset an den Event Hub, um die Position für den nächsten Lesevorgang anzugeben. Auf diese Weise können mithilfe von Prüfpunkten Ereignisse von Downstreamanwendungen als abgeschlossen markiert werden. Darüber hinaus sorgen Prüfpunkte für Resilienz bei einem Failover zwischen Lesern, die auf unterschiedlichen Computern ausgeführt werden. Sie können ältere Daten zurückgeben, indem Sie einen niedrigeren Offset aus diesem Prüfpunktprozess angeben. Durch diesen Mechanismus ermöglicht das Setzen von Prüfpunkten Failoverstabilität und eine erneute Wiedergabe des Ereignisstreams.

Offsets-Sequenznummern &

Beide Offsetsequenznummern & beziehen sich auf die Position eines Ereignisses innerhalb einer Partition. Sie können sie sich als clientseitigen Cursor vorstellen. Der Offset ist eine Nummerierung des Ereignisses in Byte. Mit der Offset-/Sequenznummer kann ein Ereignis-Consumer (Reader) einen Punkt im Ereignisdatenstrom angeben, von dem aus mit dem Lesen von Ereignissen begonnen werden soll. Sie können einen Zeitstempel angeben, sodass Sie Ereignisse empfangen, die erst nach dem angegebenen Zeitstempel in die Warteschlange eingereiht werden. Es liegt in der Verantwortung jedes Consumers, seine eigenen Offsetwerte außerhalb des Event Hubs-Diensts zu speichern. Innerhalb einer Partition enthält jedes Ereignis einen Offset, eine Sequenznummer und den Zeitstempel des Queuierungszeitpunkts.

Beispiele

Erstellen Sie einen EventHubConsumerClient.

Die einfachste Möglichkeit zum Erstellen eines EventHubConsumerClient ist die Verwendung einer Verbindungszeichenfolge.

from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Weitere Details EventHubConsumerClientfinden Sie unter EventHubs-Bibliothek .

Verwenden von Ereignissen mithilfe eines BlobCheckpointStore Prüfpunkts


from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'


def on_event(partition_context, event):
    # Put your code here.
    partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    with client:
        client.receive(on_event)

if __name__ == '__main__':
    main()

Verwenden BlobCheckpointStore mit einer anderen Version der Azure Storage Service-API

Einige Umgebungen verfügen über unterschiedliche Versionen der Azure Storage Service-API. BlobCheckpointStore Verwendet standardmäßig die Storage Service-API-Version 2019-07-07. Wenn Sie es für eine andere Version verwenden möchten, geben api_version Sie beim Erstellen des Objekts an BlobCheckpointStore .

Problembehandlung

Allgemein

Das Aktivieren der Protokollierung ist hilfreich, um Probleme zu beheben.

Protokollierung

  • Aktivieren Sie azure.eventhub.extensions.checkpointstoreblob die Protokollierung, um Ablaufverfolgungen aus der Bibliothek zu sammeln.
  • Aktivieren Sie azure.eventhub die Protokollierung, um Ablaufverfolgungen aus der Hauptbibliothek azure-eventhub zu sammeln.
  • Aktivieren Sie azure.eventhub.extensions.checkpointstoreblob._vendor.storage die Protokollierung, um Ablaufverfolgungen aus der Azure Storage-Blobbibliothek zu sammeln.
  • Aktivieren Sie uamqp die Protokollierung, um Ablaufverfolgungen aus der zugrunde liegenden uAMQP-Bibliothek zu sammeln.
  • Aktivieren Sie die Ablaufverfolgung auf AMQP-Frameebene, indem Sie beim Erstellen des Clients festlegen logging_enable=True .

Nächste Schritte

Weiterer Beispielcode

Erste Schritte mit unseren EventHubs Checkpoint Store-Beispielen.

Dokumentation

Referenzdokumentation finden Sie hier.

Feedback geben

Wenn Fehler auftreten oder Vorschläge vorliegen, melden Sie ein Problem im Abschnitt Probleme des Projekts.

Mitwirken

Beiträge und Vorschläge für dieses Projekt sind willkommen. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. Ausführliche Informationen finden Sie unter https://cla.microsoft.com.

Wenn Sie einen Pull Request (PR) übermitteln, überprüft ein CLA-Bot automatisch, ob Sie eine Lizenzvereinbarung bereitstellen und den PR entsprechend ergänzen müssen (z.B. mit einer Bezeichnung oder einem Kommentar). Führen Sie einfach die Anweisungen des Bots aus. Sie müssen dies nur einmal für alle Repositorys ausführen, die unsere CLA verwenden.

Für dieses Projekt gelten die Microsoft-Verhaltensregeln für Open Source (Microsoft Open Source Code of Conduct). Weitere Informationen finden Sie in den häufig gestellten Fragen zum Verhaltenskodex. Sie können sich auch an opencode@microsoft.com wenden, wenn Sie weitere Fragen oder Anmerkungen haben.

Aufrufe