Sdílet prostřednictvím


Klientská knihovna služby Azure EventHubs Checkpoint Store pro Python – verze 1.1.4

použití objektů blob služby Storage

Azure EventHubs Checkpoint Store se používá k ukládání kontrolních bodů při zpracování událostí z Azure Event Hubs. Tento balíček Checkpoint Store funguje jako balíček plug-in pro EventHubConsumerClient. Objekt blob služby Azure Storage používá jako trvalé úložiště pro správu kontrolních bodů a informací o vlastnictví oddílů.

Upozorňujeme, že se jedná o synchronizační knihovnu. Pokud chcete asynchronní verzi klientské knihovny azure EventHubs Checkpoint Store, projděte si téma azure-eventhub-checkpointstoreblob-aio.

Zdrojový kód | Balíček (PyPi) | Referenční dokumentace k | rozhraní API Dokumentace ke | službě Azure EventHubsDokumentace ke službě Azure Storage

Začínáme

Požadavky

  • Python2.7, Python 3.6 nebo novější.

  • Předplatné Microsoft Azure: Abyste mohli používat služby Azure, včetně Azure Event Hubs, budete potřebovat předplatné. Pokud nemáte existující účet Azure, můžete si při vytváření účtu zaregistrovat bezplatnou zkušební verzi nebo využít výhody předplatného MSDN.

  • Obor názvů služby Event Hubs s centrem událostí: Abyste mohli pracovat s Azure Event Hubs, musíte mít také k dispozici obor názvů a centrum událostí. Pokud nemáte zkušenosti s vytvářením prostředků Azure, možná budete chtít postupovat podle podrobného průvodce vytvořením centra událostí pomocí Azure Portal. Najdete tam také podrobné pokyny k vytvoření centra událostí pomocí šablon Azure CLI, Azure PowerShell nebo Azure Resource Manager (ARM).

  • Účet úložiště Azure: Budete muset mít účet služby Azure Storage a vytvořit kontejner Azure Blob Storage bloku, abyste mohli ukládat data kontrolního bodu s objekty blob. Můžete postupovat podle průvodce vytvořením účtu služby Azure Block Blob Storage.

Instalace balíčku

$ pip install azure-eventhub-checkpointstoreblob

Klíčové koncepty

Vytváření kontrolních bodů

Vytváření kontrolních bodů je proces, pomocí kterého čtenáři označují nebo potvrzují svou pozici v rámci posloupnosti událostí v oddílu. Za vytváření kontrolních bodů zodpovídá příjemce. Proces probíhá na bázi oddílů ve skupinách příjemců. Taková zodpovědnost znamená, že si každý čtenář oddílu v každé skupině příjemců musí udržovat přehled o své aktuální pozici v datovém proudu událostí a může informovat službu, když bude považovat datový proud za dokončený. Pokud se čtenář z oddílu odpojí, začne při opětovném připojení číst od kontrolního bodu, který dříve zaslal poslední čtenář daného oddílu z této skupiny příjemců. Když se čtenář připojí, předá posun do centra událostí, aby určil umístění, ve kterém má začít číst. Takto můžete vytváření kontrolních bodů použít jak k označování událostí jako „dokončených“, tak k zajištění ochrany pro případ, že nastane selhání u čtenářů spuštěných na různých strojích. Ke starším datům se je možné vrátit tak, že určíte nižší posun od tohoto kontrolního bodu. Díky tomuto mechanismu umožňuje vytváření kontrolních bodů nejen obnovu při selhání, ale i opakované přehrání datového proudu.

Posuny & pořadových čísel

Obě pořadová čísla odsazení & odkazují na pozici události v rámci oddílu. Můžete si je představit jako kurzor na straně klienta. Posun je číslo bajtu události. Posun nebo pořadové číslo umožňuje příjemci události (čtenáři) určit bod v datovém proudu událostí, ze kterého chce začít číst události. Časové razítko můžete zadat tak, aby se události dostaly do fronty až po daném časovém razítku. Příjemci si sami zodpovídají za uložení svých hodnot posunu mimo službu Event Hubs. Každá událost v rámci oddílu obsahuje posun, pořadové číslo a časové razítko, kdy byla zařazení do fronty.

Příklady

Vytvořit EventHubConsumerClient

Nejjednodušší způsob, jak vytvořit EventHubConsumerClient , je použít připojovací řetězec.

from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Další podrobnosti EventHubConsumerClientnajdete v knihovně EventHubs .

Využití událostí pomocí kontrolního BlobCheckpointStore bodu


from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'


def on_event(partition_context, event):
    # Put your code here.
    partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    with client:
        client.receive(on_event)

if __name__ == '__main__':
    main()

Použití BlobCheckpointStore s jinou verzí rozhraní API služby Azure Storage

Některá prostředí mají různé verze rozhraní API služby Azure Storage. BlobCheckpointStore ve výchozím nastavení používá rozhraní API služby Storage verze 2019-07-07. Chcete-li ji použít pro jinou verzi, zadejte api_version při vytváření objektu BlobCheckpointStore .

Poradce při potížích

Obecné

Povolení protokolování bude užitečné při odstraňování potíží.

protokolování

  • Povolte azure.eventhub.extensions.checkpointstoreblob protokolovacímu nástroji shromažďování trasování z knihovny.
  • Povolte azure.eventhub protokolovacímu nástroji shromažďovat trasování z hlavní knihovny azure-eventhub.
  • Povolte azure.eventhub.extensions.checkpointstoreblob._vendor.storage protokolovacímu nástroji shromažďovat trasování z knihovny objektů blob služby Azure Storage.
  • Povolte uamqp protokolovacímu nástroji shromažďovat trasování ze základní knihovny uAMQP.
  • Povolte trasování na úrovni rámce AMQP nastavením logging_enable=True při vytváření klienta.

Další kroky

Další ukázkový kód

Začněte s našimi ukázkami služby EventHubs Checkpoint Store.

Dokumentace

Referenční dokumentace je k dispozici tady.

Zadání zpětné vazby

Pokud narazíte na nějaké chyby nebo máte návrhy, nahlaste problém v části Problémy projektu.

Přispívání

Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com

Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.

Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo se obraťte na opencode@microsoft.com případné další dotazy nebo komentáře.

Imprese