Megosztás a következőn keresztül:


Azure EventHubs Checkpoint Store ügyfélkódtár Pythonhoz – 1.1.4-es verzió

tárolóblobok használata

Az Azure EventHubs Checkpoint Store az ellenőrzőpontok tárolására szolgál, miközben Azure Event Hubs eseményeket dolgoz fel. Ez a Checkpoint Store-csomag beépülő modulként működik a következőhöz: EventHubConsumerClient. Az Azure Storage Blobot használja állandó tárolóként az ellenőrzőpontok és a partíció tulajdonjogi adatainak karbantartásához.

Vegye figyelembe, hogy ez egy aszinkron kódtár, az Azure EventHubs Checkpoint Store ügyfélkódtár szinkronizálási verziójához tekintse meg az azure-eventhub-checkpointstoreblob című témakört.

Forráskód | Csomag (PyPi) | API-referenciadokumentáció | Az Azure Eventhubs dokumentációja | Az Azure Storage dokumentációja

Első lépések

Előfeltételek

  • Python 3.6 vagy újabb.

  • Microsoft Azure-előfizetés: Az Azure-szolgáltatások, köztük a Azure Event Hubs használatához előfizetésre lesz szüksége. Ha nem rendelkezik meglévő Azure-fiókkal, regisztrálhat egy ingyenes próbaverzióra, vagy használhatja az MSDN-előfizetői előnyöket a fiók létrehozásakor.

  • Event Hubs-névtér eseményközponttal: A Azure Event Hubs használatához egy névtérnek és az Event Hubnak is rendelkezésre kell állnia. Ha nem ismeri az Azure-erőforrások létrehozását, érdemes lehet követnie az eseményközpontok Azure Portal használatával történő létrehozásához szükséges részletes útmutatót. Itt részletes útmutatást is találhat az Azure CLI, a Azure PowerShell vagy az Azure Resource Manager (ARM) sablonok event hub létrehozásához való használatához.

  • Azure Storage-fiók: Rendelkeznie kell egy Azure Storage-fiókkal, és létre kell hoznia egy Azure Blob Storage Blokktárolót az ellenőrzőpont-adatok blobokkal való tárolásához. Kövesse az Azure Block Blob Storage-fiók létrehozását ismertető útmutatót.

A csomag telepítése

$ pip install azure-eventhub-checkpointstoreblob-aio

Fő fogalmak

Ellenőrző pontok használata

Az ellenőrzőpontok használatával az olvasók megjelölhetik vagy véglegesíthetik pozíciójukat a partíciók eseménysorozatában. Az ellenőrzőpontok használata a felhasználó felelőssége, és partíciónkénti alapon történik a felhasználói csoportban. A felelősség itt azt jelenti, hogy mindegyik felhasználói csoport esetében mindegyik partícióolvasónak nyilván kell tartania aktuális pozícióját az eseménystreamben, és tájékoztathatja a szolgáltatást, amikor az adatstreamet befejezettnek tekinti. Ha egy olvasó lecsatlakozik egy partícióról, az újracsatlakozáskor az adott felhasználói csoportban az adott partíció utolsó olvasója által elküldött ellenőrzőpontnál kezdi az olvasást. Amikor az olvasó csatlakozik, átadja az eltolást az eseményközpontnak, és megadja az olvasás megkezdésének helyét. Az ellenőrzőpontok használatával az alárendelt alkalmazások így megjelölhetik az eseményeket „befejezettként”, valamint biztosítható a rugalmasság a különböző gépeken futó olvasók közötti feladatátvétel esetén. Lehetséges visszatérni a régebbi adatokhoz egy alacsonyabb értékű eltolás megadásával az ellenőrzőpontok használata során. Ezzel a mechanizmussal az ellenőrzőpontok használata rugalmasságot biztosít feladatátvétel esetén, és lehetővé teszi az eseménystream visszajátszását.

Sorszámok eltolása &

Mindkét eltolási & sorszám egy esemény partíción belüli pozíciójára utal. Ügyféloldali kurzorként tekinthet rájuk. Az eltolás az esemény bájtalapú sorszáma. Az eltolás/sorszám lehetővé teszi, hogy az eseményfogyó (olvasó) megadjon egy pontot abban az eseménystreamben, ahonnan az eseményeket olvasni szeretné. Megadhat egy időbélyeget, hogy az eseményeket csak a megadott időbélyeg után kapja meg. A felhasználók felelőssége saját eltolásértékeik tárolása az Event Hubs szolgáltatáson kívül. A partíciókon belül minden esemény tartalmaz egy eltolást, egy sorszámot és annak időbélyegét, hogy mikor lett leküldve.

Példák

Hozzon létre egy EventHubConsumerClient

A legegyszerűbben úgy hozhat létre egy EventHubConsumerClient kapcsolati sztring.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

A létrehozás EventHubConsumerClientegyéb módjaiért tekintse meg az EventHubs-kódtárat .

Események felhasználása ellenőrzőpont használatával BlobCheckpointStore

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Használat BlobCheckpointStore az Azure Storage Service API másik verziójával

Egyes környezetek az Azure Storage Service API különböző verzióival rendelkeznek. BlobCheckpointStore alapértelmezés szerint a Storage Service API 2019-07-07-es verzióját használja. Ha egy másik verzióban szeretné használni, adja meg api_version , hogy mikor hozza létre az BlobCheckpointStore objektumot.

Hibaelhárítás

Általános kérdések

A naplózás engedélyezése hasznos lehet a hibaelhárításhoz.

Naplózás

  • Engedélyezze azure.eventhub.extensions.checkpointstoreblobaio a naplózónak, hogy nyomkövetéseket gyűjtsön a kódtárból.
  • Engedélyezze azure.eventhub a naplózónak, hogy nyomkövetéseket gyűjtsön a fő azure-eventhub-kódtárból.
  • Engedélyezze azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage a naplózónak, hogy nyomkövetéseket gyűjtsön az Azure Storage blobtárából.
  • Engedélyezze uamqp a naplózónak, hogy nyomkövetéseket gyűjtsön a mögöttes uAMQP-kódtárból.
  • Engedélyezze az AMQP keretszintű nyomkövetését az ügyfél létrehozásakor beállított beállítással logging_enable=True .

Következő lépések

További mintakód

Ismerkedés az EventHubs Checkpoint Store aszinkron mintáival.

Dokumentáció

A referenciadokumentáció itt érhető el.

Visszajelzés küldése

Ha bármilyen hibába ütközik, vagy javaslatai vannak, küldjön egy problémát a projekt Problémák szakaszában.

Közreműködés

A projektben szívesen fogadjuk a hozzájárulásokat és a javaslatokat. A legtöbb hozzájáruláshoz el kell fogadnia egy Közreműködői licencszerződést (CLA-t), amelyben kijelenti, hogy jogosult arra, hogy ránk ruházza hozzájárulása felhasználási jogát, és ezt ténylegesen meg is teszi. További részletekért lásd: https://cla.microsoft.com.

A lekéréses kérelmek elküldésekor egy CLA-robot automatikusan meghatározza, hogy kell-e biztosítania CLA-t, és megfelelően kitölti a lekéréses kérelmet (például címke, megjegyzés). Egyszerűen csak kövesse a robot által megadott utasításokat. Ezt csak egyszer kell elvégeznie az összes olyan tárházban, amely a CLA-t használja.

A projekt a Microsoft nyílt forráskódú projekteket szabályozó etikai kódexe, a Microsoft Open Source Code of Conduct hatálya alá esik. További információkért lásd a viselkedési szabályzattal kapcsolatos gyakori kérdéseket , vagy vegye fel a kapcsolatot opencode@microsoft.com az esetleges további kérdésekkel vagy megjegyzésekkel.

Megjelenések