Sdílet prostřednictvím


Klientská knihovna úložiště kontrolních bodů Azure EventHubs pro Python – verze 1.1.4

použití objektů blob služby Storage

Úložiště kontrolních bodů Azure EventHubs slouží k ukládání kontrolních bodů při zpracování událostí z Azure Event Hubs. Tento balíček Checkpoint Store funguje jako balíček plug-in pro EventHubConsumerClient. K údržbě kontrolních bodů a informací o vlastnictví oddílů používá objekt blob služby Azure Storage jako trvalé úložiště.

Upozorňujeme, že se jedná o asynchronní knihovnu. Synchronizační verzi klientské knihovny úložiště kontrolních bodů Azure EventHubs najdete v tématu azure-eventhub-checkpointstoreblob.

Zdrojový kód | Balíček (PyPi) | Referenční dokumentace k | rozhraní API Dokumentace ke | službě Azure EventHubsDokumentace ke službě Azure Storage

Začínáme

Požadavky

  • Python 3.6 nebo novější.

  • Předplatné Microsoft Azure: Pokud chcete používat služby Azure, včetně Azure Event Hubs, budete potřebovat předplatné. Pokud nemáte účet Azure, můžete si při vytváření účtu zaregistrovat bezplatnou zkušební verzi nebo využít výhody pro předplatitele MSDN.

  • Obor názvů služby Event Hubs s centrem událostí: Pokud chcete pracovat s Azure Event Hubs, musíte mít také k dispozici obor názvů a centrum událostí. Pokud nejste obeznámeni s vytvářením prostředků Azure, možná budete chtít postupovat podle podrobného průvodce pro vytvoření centra událostí pomocí Azure Portal. Najdete tam také podrobné pokyny k vytvoření centra událostí pomocí Azure CLI, Azure PowerShell nebo šablon Azure Resource Manager (ARM).

  • Účet úložiště Azure: Budete muset mít účet úložiště Azure a vytvořit kontejner Azure Blob Storage bloků, abyste mohli ukládat data kontrolních bodů s objekty blob. Můžete postupovat podle průvodce vytvořením účtu úložiště objektů blob bloku Azure.

Instalace balíčku

$ pip install azure-eventhub-checkpointstoreblob-aio

Klíčové koncepty

Vytváření kontrolních bodů

Vytváření kontrolních bodů je proces, pomocí kterého čtenáři označují nebo potvrzují svou pozici v rámci posloupnosti událostí v oddílu. Za vytváření kontrolních bodů zodpovídá příjemce. Proces probíhá na bázi oddílů ve skupinách příjemců. Taková zodpovědnost znamená, že si každý čtenář oddílu v každé skupině příjemců musí udržovat přehled o své aktuální pozici v datovém proudu událostí a může informovat službu, když bude považovat datový proud za dokončený. Pokud se čtenář z oddílu odpojí, začne při opětovném připojení číst od kontrolního bodu, který dříve zaslal poslední čtenář daného oddílu z této skupiny příjemců. Když se čtenář připojí, předá posun do centra událostí, aby určil umístění, ve kterém se má začít číst. Takto můžete vytváření kontrolních bodů použít jak k označování událostí jako „dokončených“, tak k zajištění ochrany pro případ, že nastane selhání u čtenářů spuštěných na různých strojích. Ke starším datům se je možné vrátit tak, že určíte nižší posun od tohoto kontrolního bodu. Díky tomuto mechanismu umožňuje vytváření kontrolních bodů nejen obnovu při selhání, ale i opakované přehrání datového proudu.

Posune pořadová čísla.&

Obě pořadová čísla odsazení & odkazují na pozici události v rámci oddílu. Můžete si je představit jako kurzor na straně klienta. Posun je číslo bajtu události. Posun/pořadové číslo umožňuje příjemci událostí (čtenáři) určit bod v datovém proudu událostí, ze kterého chce začít číst události. Můžete zadat časové razítko tak, aby se události zapsaly do fronty až po daném časovém razítku. Příjemci si sami zodpovídají za uložení svých hodnot posunu mimo službu Event Hubs. V rámci oddílu obsahuje každá událost posun, pořadové číslo a časové razítko zařazení do fronty.

Příklady

Vytvořit EventHubConsumerClient

Nejjednodušší způsob, jak vytvořit EventHubConsumerClient , je použít připojovací řetězec.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Další podrobnosti najdete EventHubConsumerClientv knihovně EventHubs .

Využívání událostí pomocí kontrolního BlobCheckpointStore bodu

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Použití BlobCheckpointStore s jinou verzí rozhraní API služby Azure Storage

Některá prostředí mají různé verze rozhraní API služby Azure Storage. BlobCheckpointStore ve výchozím nastavení používá rozhraní API služby Storage verze 2019-07-07. Pokud ho chcete použít pro jinou verzi, zadejte api_version při vytváření objektu BlobCheckpointStore .

Poradce při potížích

Obecné

Povolení protokolování bude užitečné při řešení potíží.

protokolování

  • Povolte azure.eventhub.extensions.checkpointstoreblobaio protokolovacímu nástroje shromažďovat trasování z knihovny.
  • Povolte azure.eventhub protokolovacímu nástroji shromažďování trasování z hlavní knihovny azure-eventhub.
  • Povolte azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage protokolovacímu nástroji shromažďování trasování z knihovny objektů blob služby Azure Storage.
  • Povolte uamqp protokolovacímu nástroji shromažďování trasování ze základní knihovny uAMQP.
  • Povolte trasování na úrovni rámce AMQP nastavením logging_enable=True při vytváření klienta.

Další kroky

Další vzorový kód

Začněte s našimi asynchronními ukázkami kontrolního bodu EventHubs v úložišti.

Dokumentace

Referenční dokumentace je k dispozici tady.

Zadání zpětné vazby

Pokud narazíte na nějaké chyby nebo máte návrhy, nahlaste problém v části Problémy projektu.

Přispívání

Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com

Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.

Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo kontaktujte s opencode@microsoft.com případnými dalšími dotazy nebo připomínkami.

Imprese