Udostępnij za pośrednictwem


Biblioteka klienta usługi Azure EventHubs Checkpoint Store dla języka Python — wersja 1.1.4

korzystanie z obiektów blob usługi Storage

Magazyn punktów kontrolnych usługi Azure EventHubs służy do przechowywania punktów kontrolnych podczas przetwarzania zdarzeń z Azure Event Hubs. Ten pakiet Sklepu Checkpoint działa jako pakiet wtyczek do EventHubConsumerClientprogramu . Usługa Azure Storage Blob jest używana jako magazyn trwały do obsługi punktów kontrolnych i informacji o własności partycji.

Należy pamiętać, że jest to biblioteka synchronizacji, w przypadku wersji asynchronicznego biblioteki klienta magazynu checkpoint usługi Azure EventHubs, zapoznaj się z tematem azure-eventhub-checkpointstoreblob-aio.

Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Dokumentacja | usługi Azure EventhubsDokumentacja usługi Azure Storage

Wprowadzenie

Wymagania wstępne

  • Python2.7, Python 3.6 lub nowszy.

  • Subskrypcja platformy Microsoft Azure: Do korzystania z usług platformy Azure, w tym Azure Event Hubs, potrzebna jest subskrypcja. Jeśli nie masz istniejącego konta platformy Azure, możesz utworzyć konto bezpłatnej wersji próbnej lub skorzystać z korzyści subskrybenta MSDN podczas tworzenia konta.

  • Przestrzeń nazw usługi Event Hubs z centrum zdarzeń: Aby korzystać z Azure Event Hubs, musisz również mieć dostępną przestrzeń nazw i centrum zdarzeń. Jeśli nie znasz tworzenia zasobów platformy Azure, możesz skorzystać z przewodnika krok po kroku dotyczącego tworzenia centrum zdarzeń przy użyciu Azure Portal. W tym miejscu można również znaleźć szczegółowe instrukcje dotyczące tworzenia centrum zdarzeń przy użyciu interfejsu wiersza polecenia platformy Azure, Azure PowerShell lub szablonów usługi Azure Resource Manager (ARM).

  • Konto usługi Azure Storage: Musisz mieć konto usługi Azure Storage i utworzyć kontener blokowy Azure Blob Storage do przechowywania danych punktu kontrolnego za pomocą obiektów blob. Możesz skorzystać z przewodnika tworzenia konta usługi Azure Block Blob Storage.

Instalowanie pakietu

$ pip install azure-eventhub-checkpointstoreblob

Kluczowe pojęcia

Tworzenie punktów kontrolnych

Tworzenie punktów kontrolnych jest procesem, za pomocą którego czytniki oznaczają lub zatwierdzają swoją pozycję w sekwencji zdarzeń partycji. Odpowiedzialność za tworzenie punktów kontrolnych spoczywa na odbiorcy i odbywa się dla każdej partycji w ramach grupy odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługi, gdy uzna, że strumień danych jest pełny. Jeśli czytnik rozłączy się od partycji, po swoim ponownym połączeniu rozpoczyna odczyt punktu kontrolnego, który został wcześniej przesłany przez ostatni czytnik tej partycji w danej grupie odbiorców. Gdy czytnik łączy się, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której należy rozpocząć odczytywanie. W ten sposób można użyć procesu tworzenia punktów kontrolnych zarówno do oznaczenia zdarzeń jako „ukończone” przez aplikacje podrzędne, jak i zapewnienia odporności zdarzenia na pracę w trybie failover między czytnikami działającymi na różnych komputerach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia od tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych zapewnia zarówno odporność na pracę w trybie failover, jak i powtarzanie strumienia zdarzeń.

Przesunięcie numerów & sekwencji

Oba numery sekwencji przesunięcia & odwołują się do pozycji zdarzenia w partycji. Można je traktować jako kursor po stronie klienta. Przesunięcie to numer bajtu zdarzenia. Przesunięcie/numer sekwencji umożliwia odbiorcy zdarzeń (czytnikowi) określenie punktu w strumieniu zdarzeń, z którego mają rozpocząć odczytywanie zdarzeń. Można określić znacznik czasu, tak aby otrzymywać zdarzenia umieszczone w kolejce tylko po danym znaczniku czasu. Odbiorcy są zobowiązani do przechowywania własnych wartości przesunięcia poza usługą Event Hubs. W ramach partycji każde zdarzenie zawiera przesunięcie, numer sekwencji i sygnaturę czasową w momencie jego kolejkowania.

Przykłady

Tworzenie elementu EventHubConsumerClient

Najprostszym sposobem utworzenia elementu EventHubConsumerClient jest użycie parametrów połączenia.

from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Aby uzyskać więcej informacji, zobacz Bibliotekę EventHubConsumerClientusługi EventHubs, aby uzyskać więcej informacji.

Korzystanie ze zdarzeń przy użyciu elementu do utworzenia punktu kontrolnego BlobCheckpointStore


from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'


def on_event(partition_context, event):
    # Put your code here.
    partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    with client:
        client.receive(on_event)

if __name__ == '__main__':
    main()

Używanie BlobCheckpointStore z inną wersją interfejsu API usługi Azure Storage

Niektóre środowiska mają różne wersje interfejsu API usługi Azure Storage. BlobCheckpointStore domyślnie używa interfejsu API usługi Storage w wersji 2019-07-07. Aby używać go w innej wersji, określ api_version podczas tworzenia BlobCheckpointStore obiektu.

Rozwiązywanie problemów

Ogólne

Włączenie rejestrowania będzie pomocne w rozwiązywaniu problemów.

Rejestrowanie

  • Włącz azure.eventhub.extensions.checkpointstoreblob rejestrator do zbierania śladów z biblioteki.
  • Włącz azure.eventhub rejestrator do zbierania śladów z głównej biblioteki azure-eventhub.
  • Włącz azure.eventhub.extensions.checkpointstoreblob._vendor.storage rejestrator do zbierania śladów z biblioteki obiektów blob usługi Azure Storage.
  • Włącz uamqp rejestrator do zbierania śladów z bazowej biblioteki uAMQP.
  • Włącz śledzenie na poziomie ramki protokołu AMQP przez ustawienie logging_enable=True podczas tworzenia klienta.

Następne kroki

Więcej przykładów kodu

Rozpocznij pracę z przykładami sklepu Checkpoint Store w usłudze EventHubs.

Dokumentacja

Dokumentacja referencyjna jest dostępna tutaj.

Przekazywanie opinii

Jeśli napotkasz jakiekolwiek usterki lub masz sugestie, zgłoś problem w sekcji Problemy w projekcie.

Współtworzenie

W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.

Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.

W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.

Wrażenia