Share via


Azure EventHubs Checkpoint Store-klientbibliotek för Python – version 1.1.4

använda lagringsblobar

Azure EventHubs Checkpoint Store används för att lagra kontrollpunkter vid bearbetning av händelser från Azure Event Hubs. Det här Checkpoint Store-paketet fungerar som ett plugin-paket till EventHubConsumerClient. Den använder Azure Storage Blob som beständig lagring för att underhålla kontrollpunkter och information om partitionsägarskap.

Observera att det här är ett asynkront bibliotek för synkroniseringsversion av Azure EventHubs Checkpoint Store-klientbiblioteket. Mer information finns i azure-eventhub-checkpointstoreblob.

Källkod | Paket (PyPi) | API-referensdokumentation | Dokumentation | om Azure EventhubsDokumentation om Azure Storage

Komma igång

Förutsättningar

  • Python 3.6 eller senare.

  • Microsoft Azure-prenumeration: Om du vill använda Azure-tjänster, inklusive Azure Event Hubs, behöver du en prenumeration. Om du inte har ett befintligt Azure-konto kan du registrera dig för en kostnadsfri utvärderingsversion eller använda dina MSDN-prenumerantförmåner när du skapar ett konto.

  • Event Hubs-namnrymd med en händelsehubb: Om du vill interagera med Azure Event Hubs måste du också ha ett namnområde och en händelsehubb tillgänglig. Om du inte är bekant med att skapa Azure-resurser kan du följa den stegvisa guiden för att skapa en händelsehubb med hjälp av Azure Portal. Där hittar du även detaljerade instruktioner för att skapa en händelsehubb med hjälp av Mallarna Azure CLI, Azure PowerShell eller Azure Resource Manager (ARM).

  • Azure Storage-konto: Du måste ha ett Azure Storage-konto och skapa en Azure Blob Storage Blockera container för att lagra kontrollpunktsdata med blobar. Du kan följa guiden för att skapa ett Azure Block Blob Storage-konto.

Installera paketet

$ pip install azure-eventhub-checkpointstoreblob-aio

Viktiga begrepp

Kontrollpunkter

Att skapa kontrollpunkter är en process genom vilken läsare markerar eller sparar sin position inom en händelsesekvens i en partition. Att skapa kontrollpunkter är konsumentens ansvar och görs för varje partition i en konsumentgrupp. Det här ansvaret innebär att varje läsare i partitionen måste hålla reda på sin nuvarande position i händelseströmmen för varje konsumentgrupp. Läsaren kan sedan informera tjänsten när de anser att dataströmmen är klar. Om en läsare kopplar från en partition och den sedan återansluts kan han börja läsa vid den kontrollpunkt som tidigare skickades in av den senaste läsaren i den aktuella partitionen inom just den konsumentgruppen. När läsaren ansluter skickas förskjutningen till händelsehubben för att ange den plats där läsningen ska börja. På så sätt kan du använda kontrollpunkter både till att markera händelser som ”klara” i underordnade program och som skydd i händelse av en redundansväxling mellan läsare som körs på olika datorer. Du kan återgå till äldre data genom att ange en lägre offset i den här kontrollpunktsprocessen. Den här mekanismen möjliggör både återhämtning vid redundansväxlingar och återuppspelning av händelseströmmar.

Förskjuter & sekvensnummer

Båda förskjutningssekvensnumret & refererar till positionen för en händelse i en partition. Du kan se dem som en markör på klientsidan. Denna offset är en byte-numrering av händelsen. Med förskjutnings-/sekvensnumret kan en händelsekonsument (läsare) ange en punkt i händelseströmmen som de vill börja läsa händelser från. Du kan ange en tidsstämpel så att du får händelser som anges först efter den angivna tidsstämpeln. Konsumenterna ansvarar för att lagra sina egna offset-värden utanför händelsehubbtjänsten. I en partition innehåller varje händelse en förskjutning, ett sekvensnummer och tidsstämpeln för när den angavs.

Exempel

Skapa en EventHubConsumerClient

Det enklaste sättet att skapa en EventHubConsumerClient är att använda en anslutningssträng.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Andra sätt att skapa ett EventHubConsumerClientfinns i EventHubs-biblioteket för mer information.

Använda händelser med hjälp av en BlobCheckpointStore att göra-kontrollpunkt

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Använda BlobCheckpointStore med en annan version av Azure Storage Service API

Vissa miljöer har olika versioner av Azure Storage Service API. BlobCheckpointStore Som standard använder Storage Service API version 2019-07-07. Om du vill använda det mot en annan version anger api_version du när du skapar BlobCheckpointStore objektet.

Felsökning

Allmänt

Det är bra att aktivera loggning för att felsöka.

Loggning

  • Aktivera azure.eventhub.extensions.checkpointstoreblobaio loggning för att samla in spårningar från biblioteket.
  • Aktivera azure.eventhub loggning för att samla in spårningar från huvudbiblioteket för azure-eventhub.
  • Aktivera azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage loggning för att samla in spårningar från Azure Storage Blob Library.
  • Aktivera uamqp loggning för att samla in spårningar från det underliggande uAMQP-biblioteket.
  • Aktivera SPÅRNING AV AMQP-ramnivå genom att ange logging_enable=True när klienten skapas.

Nästa steg

Mer exempelkod

Kom igång med våra asynkrona exempel för EventHubs Checkpoint Store.

Dokumentation

Referensdokumentation finns här.

Ge feedback

Om du stöter på buggar eller har förslag kan du ange ett problem i avsnittet Problem i projektet.

Bidra

Det här projektet välkomnar bidrag och förslag. Merparten av bidragen kräver att du godkänner ett licensavtal för bidrag, där du deklarerar att du har behörighet att bevilja oss rättigheten att använda ditt bidrag, och att du dessutom uttryckligen gör så. Mer information finns på https://cla.microsoft.com.

När du skickar en pull-förfrågan avgör en CLA-robot automatiskt om du måste tillhandahålla ett licensavtal för bidrag med lämplig PR (t.ex. etikett eller kommentar). Följ bara robotens anvisningar. Du behöver bara göra detta en gång för alla repor som använder vårt licensavtal för bidrag.

Det här projektet använder sig av Microsofts uppförandekod för öppen källkod. Mer information finns i Vanliga frågor och svar om uppförandekod eller kontakt opencode@microsoft.com med ytterligare frågor eller kommentarer.

Visningar