Biblioteka klienta usługi Azure EventHubs Checkpoint Store dla języka Python — wersja 1.1.4
korzystanie z obiektów blob usługi Storage
Magazyn punktów kontrolnych usługi Azure EventHubs służy do przechowywania punktów kontrolnych podczas przetwarzania zdarzeń z Azure Event Hubs.
Ten pakiet sklepu Checkpoint działa jako pakiet wtyczki do EventHubConsumerClient
programu . Używa ona obiektu blob usługi Azure Storage jako magazynu trwałego do obsługi punktów kontrolnych i informacji o własności partycji.
Należy pamiętać, że jest to biblioteka asynchronizuj, aby uzyskać informacje o wersji synchronizacji biblioteki klienta usługi Azure EventHubs Checkpoint Store, zapoznaj się z artykułem azure-eventhub-checkpointstoreblob.
Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Dokumentacja | usługi Azure EventhubsDokumentacja usługi Azure Storage
Wprowadzenie
Wymagania wstępne
Język Python w wersji 3.6 lub nowszej.
Subskrypcja platformy Microsoft Azure: Aby korzystać z usług platformy Azure, w tym Azure Event Hubs, potrzebna będzie subskrypcja. Jeśli nie masz istniejącego konta platformy Azure, możesz utworzyć konto bezpłatnej wersji próbnej lub skorzystać z korzyści dla subskrybentów MSDN podczas tworzenia konta.
Przestrzeń nazw usługi Event Hubs z centrum zdarzeń: Aby korzystać z Azure Event Hubs, musisz również mieć dostęp do przestrzeni nazw i centrum zdarzeń. Jeśli nie znasz tworzenia zasobów platformy Azure, możesz skorzystać z przewodnika krok po kroku dotyczącego tworzenia centrum zdarzeń przy użyciu Azure Portal. W tym miejscu można również znaleźć szczegółowe instrukcje dotyczące korzystania z interfejsu wiersza polecenia platformy Azure, Azure PowerShell lub szablonów usługi Azure Resource Manager (ARM), aby utworzyć centrum zdarzeń.
Konto usługi Azure Storage: Musisz mieć konto usługi Azure Storage i utworzyć kontener blokowy Azure Blob Storage do przechowywania danych punktu kontrolnego za pomocą obiektów blob. Możesz postępować zgodnie z przewodnikiem dotyczącym tworzenia konta usługi Azure Block Blob Storage.
Instalowanie pakietu
$ pip install azure-eventhub-checkpointstoreblob-aio
Kluczowe pojęcia
Tworzenie punktów kontrolnych
Tworzenie punktów kontrolnych jest procesem, za pomocą którego czytniki oznaczają lub zatwierdzają swoją pozycję w sekwencji zdarzeń partycji. Odpowiedzialność za tworzenie punktów kontrolnych spoczywa na odbiorcy i odbywa się dla każdej partycji w ramach grupy odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługi, gdy uzna, że strumień danych jest pełny. Jeśli czytnik rozłączy się od partycji, po swoim ponownym połączeniu rozpoczyna odczyt punktu kontrolnego, który został wcześniej przesłany przez ostatni czytnik tej partycji w danej grupie odbiorców. Gdy czytnik łączy się, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której chcesz rozpocząć odczytywanie. W ten sposób można użyć procesu tworzenia punktów kontrolnych zarówno do oznaczenia zdarzeń jako „ukończone” przez aplikacje podrzędne, jak i zapewnienia odporności zdarzenia na pracę w trybie failover między czytnikami działającymi na różnych komputerach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia od tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych zapewnia zarówno odporność na pracę w trybie failover, jak i powtarzanie strumienia zdarzeń.
Przesunięcia numerów & sekwencji
Oba numery sekwencji przesunięcia & odwołują się do pozycji zdarzenia w partycji. Można je traktować jako kursor po stronie klienta. Przesunięcie to numer bajtu zdarzenia. Numer przesunięcia/sekwencji umożliwia użytkownikowi zdarzeń (czytnikowi) określenie punktu w strumieniu zdarzeń, z którego chcą rozpocząć odczytywanie zdarzeń. Można określić sygnaturę czasową, tak aby otrzymywać zdarzenia w kolejce tylko po podanym znaczniku czasu. Odbiorcy są zobowiązani do przechowywania własnych wartości przesunięcia poza usługą Event Hubs. W ramach partycji każde zdarzenie zawiera przesunięcie, numer sekwencji i sygnaturę czasową, gdy została umieszczona w kolejce.
Przykłady
- Tworzenie usługi Azure EventHubs
EventHubConsumerClient
- Korzystanie z zdarzeń przy użyciu elementu
BlobCheckpointStore
Tworzenie EventHubConsumerClient
Najprostszym sposobem utworzenia elementu EventHubConsumerClient
jest użycie parametrów połączenia.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Aby uzyskać więcej informacji, zapoznaj się z biblioteką EventHubConsumerClient
usługi EventHubs, aby uzyskać więcej informacji.
Używanie zdarzeń przy użyciu punktu kontrolnego BlobCheckpointStore
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Używanie BlobCheckpointStore
z inną wersją interfejsu API usługi Azure Storage
Niektóre środowiska mają różne wersje interfejsu API usługi Azure Storage.
BlobCheckpointStore
domyślnie używa interfejsu API usługi Storage w wersji 2019-07-07. Aby używać go w innej wersji, określ api_version
podczas tworzenia BlobCheckpointStore
obiektu.
Rozwiązywanie problemów
Ogólne
Włączenie rejestrowania będzie pomocne w rozwiązywaniu problemów.
Rejestrowanie
- Włącz
azure.eventhub.extensions.checkpointstoreblobaio
rejestrator w celu zbierania śladów z biblioteki. - Włącz
azure.eventhub
rejestrator w celu zbierania śladów z głównej biblioteki azure-eventhub. - Włącz
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
rejestrator w celu zbierania śladów z biblioteki obiektów blob usługi Azure Storage. - Włącz
uamqp
rejestrator w celu zbierania śladów z bazowej biblioteki uAMQP. - Włącz śledzenie na poziomie ramek amQP przez ustawienie
logging_enable=True
podczas tworzenia klienta.
Następne kroki
Więcej przykładów kodu
Rozpocznij pracę z przykładami asynchronicznych sklepu EventHubs Checkpoint Store.
- receive_events_using_checkpoint_store_async.py — przykładowe zdarzenie EventHubConsumerClient z magazynem punktów kontrolnych obiektów blob
- receive_events_using_checkpoint_store_storage_api_version_async.py — przykład wersji eventHubConsumerClient z magazynem punktów kontrolnych obiektów blob i wersją magazynu
Dokumentacja
Dokumentacja referencyjna jest dostępna tutaj.
Przekazywanie opinii
Jeśli wystąpią jakiekolwiek usterki lub masz sugestie, zgłoś problem w sekcji Problemy w projekcie.
Współtworzenie
W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.
Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.
W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.
Azure SDK for Python