Klientská knihovna služby Azure EventHubs Checkpoint Store pro Python – verze 1.1.4
použití objektů blob služby Storage
Azure EventHubs Checkpoint Store se používá k ukládání kontrolních bodů při zpracování událostí z Azure Event Hubs.
Tento balíček Checkpoint Store funguje jako balíček plug-in pro EventHubConsumerClient
. Objekt blob služby Azure Storage používá jako trvalé úložiště pro správu kontrolních bodů a informací o vlastnictví oddílů.
Upozorňujeme, že se jedná o synchronizační knihovnu. Pokud chcete asynchronní verzi klientské knihovny azure EventHubs Checkpoint Store, projděte si téma azure-eventhub-checkpointstoreblob-aio.
Zdrojový kód | Balíček (PyPi) | Referenční dokumentace k | rozhraní API Dokumentace ke | službě Azure EventHubsDokumentace ke službě Azure Storage
Začínáme
Požadavky
Python2.7, Python 3.6 nebo novější.
Předplatné Microsoft Azure: Abyste mohli používat služby Azure, včetně Azure Event Hubs, budete potřebovat předplatné. Pokud nemáte existující účet Azure, můžete si při vytváření účtu zaregistrovat bezplatnou zkušební verzi nebo využít výhody předplatného MSDN.
Obor názvů služby Event Hubs s centrem událostí: Abyste mohli pracovat s Azure Event Hubs, musíte mít také k dispozici obor názvů a centrum událostí. Pokud nemáte zkušenosti s vytvářením prostředků Azure, možná budete chtít postupovat podle podrobného průvodce vytvořením centra událostí pomocí Azure Portal. Najdete tam také podrobné pokyny k vytvoření centra událostí pomocí šablon Azure CLI, Azure PowerShell nebo Azure Resource Manager (ARM).
Účet úložiště Azure: Budete muset mít účet služby Azure Storage a vytvořit kontejner Azure Blob Storage bloku, abyste mohli ukládat data kontrolního bodu s objekty blob. Můžete postupovat podle průvodce vytvořením účtu služby Azure Block Blob Storage.
Instalace balíčku
$ pip install azure-eventhub-checkpointstoreblob
Klíčové koncepty
Vytváření kontrolních bodů
Vytváření kontrolních bodů je proces, pomocí kterého čtenáři označují nebo potvrzují svou pozici v rámci posloupnosti událostí v oddílu. Za vytváření kontrolních bodů zodpovídá příjemce. Proces probíhá na bázi oddílů ve skupinách příjemců. Taková zodpovědnost znamená, že si každý čtenář oddílu v každé skupině příjemců musí udržovat přehled o své aktuální pozici v datovém proudu událostí a může informovat službu, když bude považovat datový proud za dokončený. Pokud se čtenář z oddílu odpojí, začne při opětovném připojení číst od kontrolního bodu, který dříve zaslal poslední čtenář daného oddílu z této skupiny příjemců. Když se čtenář připojí, předá posun do centra událostí, aby určil umístění, ve kterém má začít číst. Takto můžete vytváření kontrolních bodů použít jak k označování událostí jako „dokončených“, tak k zajištění ochrany pro případ, že nastane selhání u čtenářů spuštěných na různých strojích. Ke starším datům se je možné vrátit tak, že určíte nižší posun od tohoto kontrolního bodu. Díky tomuto mechanismu umožňuje vytváření kontrolních bodů nejen obnovu při selhání, ale i opakované přehrání datového proudu.
Posuny & pořadových čísel
Obě pořadová čísla odsazení & odkazují na pozici události v rámci oddílu. Můžete si je představit jako kurzor na straně klienta. Posun je číslo bajtu události. Posun nebo pořadové číslo umožňuje příjemci události (čtenáři) určit bod v datovém proudu událostí, ze kterého chce začít číst události. Časové razítko můžete zadat tak, aby se události dostaly do fronty až po daném časovém razítku. Příjemci si sami zodpovídají za uložení svých hodnot posunu mimo službu Event Hubs. Každá událost v rámci oddílu obsahuje posun, pořadové číslo a časové razítko, kdy byla zařazení do fronty.
Příklady
Vytvořit EventHubConsumerClient
Nejjednodušší způsob, jak vytvořit EventHubConsumerClient
, je použít připojovací řetězec.
from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Další podrobnosti EventHubConsumerClient
najdete v knihovně EventHubs .
Využití událostí pomocí kontrolního BlobCheckpointStore
bodu
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
def on_event(partition_context, event):
# Put your code here.
partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
with client:
client.receive(on_event)
if __name__ == '__main__':
main()
Použití BlobCheckpointStore
s jinou verzí rozhraní API služby Azure Storage
Některá prostředí mají různé verze rozhraní API služby Azure Storage.
BlobCheckpointStore
ve výchozím nastavení používá rozhraní API služby Storage verze 2019-07-07. Chcete-li ji použít pro jinou verzi, zadejte api_version
při vytváření objektu BlobCheckpointStore
.
Poradce při potížích
Obecné
Povolení protokolování bude užitečné při odstraňování potíží.
protokolování
- Povolte
azure.eventhub.extensions.checkpointstoreblob
protokolovacímu nástroji shromažďování trasování z knihovny. - Povolte
azure.eventhub
protokolovacímu nástroji shromažďovat trasování z hlavní knihovny azure-eventhub. - Povolte
azure.eventhub.extensions.checkpointstoreblob._vendor.storage
protokolovacímu nástroji shromažďovat trasování z knihovny objektů blob služby Azure Storage. - Povolte
uamqp
protokolovacímu nástroji shromažďovat trasování ze základní knihovny uAMQP. - Povolte trasování na úrovni rámce AMQP nastavením
logging_enable=True
při vytváření klienta.
Další kroky
Další ukázkový kód
Začněte s našimi ukázkami služby EventHubs Checkpoint Store.
- receive_events_using_checkpoint_store.py – Příklad eventHubConsumerClient s úložištěm kontrolních bodů objektů blob
- receive_events_using_checkpoint_store_storage_api_version.py – Příklad eventHubConsumerClient s úložištěm kontrolních bodů objektů blob a verzí úložiště
Dokumentace
Referenční dokumentace je k dispozici tady.
Zadání zpětné vazby
Pokud narazíte na nějaké chyby nebo máte návrhy, nahlaste problém v části Problémy projektu.
Přispívání
Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com
Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.
Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo se obraťte na opencode@microsoft.com případné další dotazy nebo komentáře.
Azure SDK for Python