Biblioteka klienta usługi Azure EventHubs Checkpoint Store dla języka Python — wersja 1.1.4
korzystanie z obiektów blob usługi Storage
Magazyn punktów kontrolnych usługi Azure EventHubs służy do przechowywania punktów kontrolnych podczas przetwarzania zdarzeń z Azure Event Hubs.
Ten pakiet Sklepu Checkpoint działa jako pakiet wtyczek do EventHubConsumerClient
programu . Usługa Azure Storage Blob jest używana jako magazyn trwały do obsługi punktów kontrolnych i informacji o własności partycji.
Należy pamiętać, że jest to biblioteka synchronizacji, w przypadku wersji asynchronicznego biblioteki klienta magazynu checkpoint usługi Azure EventHubs, zapoznaj się z tematem azure-eventhub-checkpointstoreblob-aio.
Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Dokumentacja | usługi Azure EventhubsDokumentacja usługi Azure Storage
Wprowadzenie
Wymagania wstępne
Python2.7, Python 3.6 lub nowszy.
Subskrypcja platformy Microsoft Azure: Do korzystania z usług platformy Azure, w tym Azure Event Hubs, potrzebna jest subskrypcja. Jeśli nie masz istniejącego konta platformy Azure, możesz utworzyć konto bezpłatnej wersji próbnej lub skorzystać z korzyści subskrybenta MSDN podczas tworzenia konta.
Przestrzeń nazw usługi Event Hubs z centrum zdarzeń: Aby korzystać z Azure Event Hubs, musisz również mieć dostępną przestrzeń nazw i centrum zdarzeń. Jeśli nie znasz tworzenia zasobów platformy Azure, możesz skorzystać z przewodnika krok po kroku dotyczącego tworzenia centrum zdarzeń przy użyciu Azure Portal. W tym miejscu można również znaleźć szczegółowe instrukcje dotyczące tworzenia centrum zdarzeń przy użyciu interfejsu wiersza polecenia platformy Azure, Azure PowerShell lub szablonów usługi Azure Resource Manager (ARM).
Konto usługi Azure Storage: Musisz mieć konto usługi Azure Storage i utworzyć kontener blokowy Azure Blob Storage do przechowywania danych punktu kontrolnego za pomocą obiektów blob. Możesz skorzystać z przewodnika tworzenia konta usługi Azure Block Blob Storage.
Instalowanie pakietu
$ pip install azure-eventhub-checkpointstoreblob
Kluczowe pojęcia
Tworzenie punktów kontrolnych
Tworzenie punktów kontrolnych jest procesem, za pomocą którego czytniki oznaczają lub zatwierdzają swoją pozycję w sekwencji zdarzeń partycji. Odpowiedzialność za tworzenie punktów kontrolnych spoczywa na odbiorcy i odbywa się dla każdej partycji w ramach grupy odbiorców. Ta odpowiedzialność oznacza, że dla każdej grupy odbiorców każdy czytnik partycji musi śledzić swoją bieżącą pozycję w strumieniu zdarzeń i może poinformować usługi, gdy uzna, że strumień danych jest pełny. Jeśli czytnik rozłączy się od partycji, po swoim ponownym połączeniu rozpoczyna odczyt punktu kontrolnego, który został wcześniej przesłany przez ostatni czytnik tej partycji w danej grupie odbiorców. Gdy czytnik łączy się, przekazuje przesunięcie do centrum zdarzeń, aby określić lokalizację, w której należy rozpocząć odczytywanie. W ten sposób można użyć procesu tworzenia punktów kontrolnych zarówno do oznaczenia zdarzeń jako „ukończone” przez aplikacje podrzędne, jak i zapewnienia odporności zdarzenia na pracę w trybie failover między czytnikami działającymi na różnych komputerach. Istnieje możliwość powrotu do starszych danych przez określenie niższego przesunięcia od tego procesu tworzenia punktów kontrolnych. Dzięki temu mechanizmowi tworzenie punktów kontrolnych zapewnia zarówno odporność na pracę w trybie failover, jak i powtarzanie strumienia zdarzeń.
Przesunięcie numerów & sekwencji
Oba numery sekwencji przesunięcia & odwołują się do pozycji zdarzenia w partycji. Można je traktować jako kursor po stronie klienta. Przesunięcie to numer bajtu zdarzenia. Przesunięcie/numer sekwencji umożliwia odbiorcy zdarzeń (czytnikowi) określenie punktu w strumieniu zdarzeń, z którego mają rozpocząć odczytywanie zdarzeń. Można określić znacznik czasu, tak aby otrzymywać zdarzenia umieszczone w kolejce tylko po danym znaczniku czasu. Odbiorcy są zobowiązani do przechowywania własnych wartości przesunięcia poza usługą Event Hubs. W ramach partycji każde zdarzenie zawiera przesunięcie, numer sekwencji i sygnaturę czasową w momencie jego kolejkowania.
Przykłady
- Tworzenie usługi Azure EventHubs
EventHubConsumerClient
- Korzystanie ze zdarzeń przy użyciu elementu
BlobCheckpointStore
Tworzenie elementu EventHubConsumerClient
Najprostszym sposobem utworzenia elementu EventHubConsumerClient
jest użycie parametrów połączenia.
from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Aby uzyskać więcej informacji, zobacz Bibliotekę EventHubConsumerClient
usługi EventHubs, aby uzyskać więcej informacji.
Korzystanie ze zdarzeń przy użyciu elementu do utworzenia punktu kontrolnego BlobCheckpointStore
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
def on_event(partition_context, event):
# Put your code here.
partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
with client:
client.receive(on_event)
if __name__ == '__main__':
main()
Używanie BlobCheckpointStore
z inną wersją interfejsu API usługi Azure Storage
Niektóre środowiska mają różne wersje interfejsu API usługi Azure Storage.
BlobCheckpointStore
domyślnie używa interfejsu API usługi Storage w wersji 2019-07-07. Aby używać go w innej wersji, określ api_version
podczas tworzenia BlobCheckpointStore
obiektu.
Rozwiązywanie problemów
Ogólne
Włączenie rejestrowania będzie pomocne w rozwiązywaniu problemów.
Rejestrowanie
- Włącz
azure.eventhub.extensions.checkpointstoreblob
rejestrator do zbierania śladów z biblioteki. - Włącz
azure.eventhub
rejestrator do zbierania śladów z głównej biblioteki azure-eventhub. - Włącz
azure.eventhub.extensions.checkpointstoreblob._vendor.storage
rejestrator do zbierania śladów z biblioteki obiektów blob usługi Azure Storage. - Włącz
uamqp
rejestrator do zbierania śladów z bazowej biblioteki uAMQP. - Włącz śledzenie na poziomie ramki protokołu AMQP przez ustawienie
logging_enable=True
podczas tworzenia klienta.
Następne kroki
Więcej przykładów kodu
Rozpocznij pracę z przykładami sklepu Checkpoint Store w usłudze EventHubs.
- receive_events_using_checkpoint_store.py — przykładowe zdarzenieHubConsumerClient z magazynem punktów kontrolnych obiektów blob
- receive_events_using_checkpoint_store_storage_api_version.py — przykład wersji eventHubConsumerClient z magazynem punktów kontrolnych obiektów blob i wersją magazynu
Dokumentacja
Dokumentacja referencyjna jest dostępna tutaj.
Przekazywanie opinii
Jeśli napotkasz jakiekolwiek usterki lub masz sugestie, zgłoś problem w sekcji Problemy w projekcie.
Współtworzenie
W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.
Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.
W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.
Azure SDK for Python