Udostępnij za pośrednictwem


Biblioteka klienta usługi Azure EventHubs Checkpoint Store dla języka Python — wersja 1.1.4

korzystanie z obiektów blob usługi Storage

Magazyn punktów kontrolnych usługi Azure EventHubs służy do przechowywania punktów kontrolnych podczas przetwarzania zdarzeń z Azure Event Hubs. Ten pakiet sklepu Checkpoint działa jako pakiet wtyczki do EventHubConsumerClientprogramu . Używa ona obiektu blob usługi Azure Storage jako magazynu trwałego do obsługi punktów kontrolnych i informacji o własności partycji.

Należy pamiętać, że jest to biblioteka asynchronizuj, aby uzyskać informacje o wersji synchronizacji biblioteki klienta usługi Azure EventHubs Checkpoint Store, zapoznaj się z artykułem azure-eventhub-checkpointstoreblob.

Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Dokumentacja | usługi Azure EventhubsDokumentacja usługi Azure Storage

Wprowadzenie

Wymagania wstępne

  • Język Python w wersji 3.6 lub nowszej.

  • Subskrypcja platformy Microsoft Azure: Aby korzystać z usług platformy Azure, w tym Azure Event Hubs, potrzebna będzie subskrypcja. Jeśli nie masz istniejącego konta platformy Azure, możesz utworzyć konto bezpłatnej wersji próbnej lub skorzystać z korzyści dla subskrybentów MSDN podczas tworzenia konta.

  • Przestrzeń nazw usługi Event Hubs z centrum zdarzeń: Aby korzystać z Azure Event Hubs, musisz również mieć dostęp do przestrzeni nazw i centrum zdarzeń. Jeśli nie znasz tworzenia zasobów platformy Azure, możesz skorzystać z przewodnika krok po kroku dotyczącego tworzenia centrum zdarzeń przy użyciu Azure Portal. W tym miejscu można również znaleźć szczegółowe instrukcje dotyczące korzystania z interfejsu wiersza polecenia platformy Azure, Azure PowerShell lub szablonów usługi Azure Resource Manager (ARM), aby utworzyć centrum zdarzeń.

  • Konto usługi Azure Storage: Musisz mieć konto usługi Azure Storage i utworzyć kontener blokowy Azure Blob Storage do przechowywania danych punktu kontrolnego za pomocą obiektów blob. Możesz postępować zgodnie z przewodnikiem dotyczącym tworzenia konta usługi Azure Block Blob Storage.

Instalowanie pakietu

$ pip install azure-eventhub-checkpointstoreblob-aio

Kluczowe pojęcia

Tworzenie punktów kontrolnych

Tworzenie punktów kontrolnych jest procesem, za pomocą którego czytniki oznaczają lub zatwierdzają swoją pozycję w sekwencji zdarzeń partycji. Odpowiedzialność za tworzenie punktów kontrolnych spoczywa na odbiorcy i odbywa się dla każdej partycji w ramach grupy odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługi, gdy uzna, że strumień danych jest pełny. Jeśli czytnik rozłączy się od partycji, po swoim ponownym połączeniu rozpoczyna odczyt punktu kontrolnego, który został wcześniej przesłany przez ostatni czytnik tej partycji w danej grupie odbiorców. Gdy czytnik łączy się, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której chcesz rozpocząć odczytywanie. W ten sposób można użyć procesu tworzenia punktów kontrolnych zarówno do oznaczenia zdarzeń jako „ukończone” przez aplikacje podrzędne, jak i zapewnienia odporności zdarzenia na pracę w trybie failover między czytnikami działającymi na różnych komputerach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia od tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych zapewnia zarówno odporność na pracę w trybie failover, jak i powtarzanie strumienia zdarzeń.

Przesunięcia numerów & sekwencji

Oba numery sekwencji przesunięcia & odwołują się do pozycji zdarzenia w partycji. Można je traktować jako kursor po stronie klienta. Przesunięcie to numer bajtu zdarzenia. Numer przesunięcia/sekwencji umożliwia użytkownikowi zdarzeń (czytnikowi) określenie punktu w strumieniu zdarzeń, z którego chcą rozpocząć odczytywanie zdarzeń. Można określić sygnaturę czasową, tak aby otrzymywać zdarzenia w kolejce tylko po podanym znaczniku czasu. Odbiorcy są zobowiązani do przechowywania własnych wartości przesunięcia poza usługą Event Hubs. W ramach partycji każde zdarzenie zawiera przesunięcie, numer sekwencji i sygnaturę czasową, gdy została umieszczona w kolejce.

Przykłady

Tworzenie EventHubConsumerClient

Najprostszym sposobem utworzenia elementu EventHubConsumerClient jest użycie parametrów połączenia.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Aby uzyskać więcej informacji, zapoznaj się z biblioteką EventHubConsumerClientusługi EventHubs, aby uzyskać więcej informacji.

Używanie zdarzeń przy użyciu punktu kontrolnego BlobCheckpointStore

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Używanie BlobCheckpointStore z inną wersją interfejsu API usługi Azure Storage

Niektóre środowiska mają różne wersje interfejsu API usługi Azure Storage. BlobCheckpointStore domyślnie używa interfejsu API usługi Storage w wersji 2019-07-07. Aby używać go w innej wersji, określ api_version podczas tworzenia BlobCheckpointStore obiektu.

Rozwiązywanie problemów

Ogólne

Włączenie rejestrowania będzie pomocne w rozwiązywaniu problemów.

Rejestrowanie

  • Włącz azure.eventhub.extensions.checkpointstoreblobaio rejestrator w celu zbierania śladów z biblioteki.
  • Włącz azure.eventhub rejestrator w celu zbierania śladów z głównej biblioteki azure-eventhub.
  • Włącz azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage rejestrator w celu zbierania śladów z biblioteki obiektów blob usługi Azure Storage.
  • Włącz uamqp rejestrator w celu zbierania śladów z bazowej biblioteki uAMQP.
  • Włącz śledzenie na poziomie ramek amQP przez ustawienie logging_enable=True podczas tworzenia klienta.

Następne kroki

Więcej przykładów kodu

Rozpocznij pracę z przykładami asynchronicznych sklepu EventHubs Checkpoint Store.

Dokumentacja

Dokumentacja referencyjna jest dostępna tutaj.

Przekazywanie opinii

Jeśli wystąpią jakiekolwiek usterki lub masz sugestie, zgłoś problem w sekcji Problemy w projekcie.

Współtworzenie

W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.

Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.

W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.

Wrażenia