Compartir a través de


Biblioteca cliente del Almacén de puntos de comprobación de Azure EventHubs para Python: versión 1.1.4

uso de blobs de almacenamiento

El almacén de puntos de control de Azure EventHubs se usa para almacenar puntos de control durante el procesamiento de eventos de Azure Event Hubs. Este paquete de almacén de puntos de control funciona como un paquete de complemento en EventHubConsumerClient. Usa El blob de Azure Storage como almacén persistente para mantener los puntos de control y la información de propiedad de la partición.

Tenga en cuenta que se trata de una biblioteca asincrónica, para la versión de sincronización de la biblioteca cliente del Almacén de puntos de comprobación de Azure EventHubs, consulte azure-eventhub-checkpointstoreblob.

Código | fuente Paquete (PyPi) | Documentación | de referencia de API Documentación | de Azure EventHubsDocumentación de Azure Storage

Introducción

Requisitos previos

  • Python 3.6 o versiones posteriores.

  • Suscripción de Microsoft Azure: Para usar los servicios de Azure, incluida Azure Event Hubs, necesitará una suscripción. Si no tiene una cuenta de Azure existente, puede registrarse para obtener una evaluación gratuita o usar las ventajas del suscriptor de MSDN al crear una cuenta.

  • Espacio de nombres de Event Hubs con un centro de eventos: Para interactuar con Azure Event Hubs, también deberá tener un espacio de nombres y un centro de eventos disponibles. Si no está familiarizado con la creación de recursos de Azure, puede seguir la guía paso a paso para crear un centro de eventos mediante el Azure Portal. Allí también puede encontrar instrucciones detalladas para usar la CLI de Azure, Azure PowerShell o plantillas de Azure Resource Manager (ARM) para crear un centro de eventos.

  • Cuenta de Azure Storage: Tendrá que tener una cuenta de Azure Storage y crear un contenedor de bloques de Azure Blob Storage para almacenar los datos del punto de control con blobs. Puede seguir la guía para crear una cuenta de Azure Block Blob Storage.

Instalar el paquete

$ pip install azure-eventhub-checkpointstoreblob-aio

Conceptos clave

Puntos de control

Puntos de control es un proceso en el que los lectores marcan o confirman su posición dentro de la secuencia de eventos de una partición. La creación de puntos de comprobación es responsabilidad del consumidor y se realiza por partición dentro de un grupo de consumidores. Esta responsaibilidad significa que por cada grupo de consumidores, cada lector de la partición debe realizar un seguimiento de su posición actual en el flujo del evento y puede informar al servicio cuando considere que el flujo de datos se ha completado. Si se desconecta un lector de una partición, cuando se vuelve a conectar comienza a leer en el punto de comprobación que envió previamente el último lector de esa partición en ese grupo de consumidores. Cuando se conecta el lector, pasa este desplazamiento al centro de eventos para especificar la ubicación en la que se va a empezar a leer. De este modo, puede usar puntos de comprobación para marcar eventos como "completados" por las aplicaciones de bajada y para ofrecer resistencia en caso de que se produzca una conmutación por error entre lectores que se ejecutan en máquinas distintas. Es posible volver a los datos más antiguos especificando un desplazamiento inferior desde este proceso de puntos de comprobación. Mediante este mecanismo, los puntos de comprobación permiten una resistencia a la conmutación por error y una reproducción del flujo de eventos.

Desplazamientos de & números de secuencia

Ambos números de secuencia de desplazamiento & hacen referencia a la posición de un evento dentro de una partición. Puede considerarlos como un cursor del lado cliente. El desplazamiento es una numeración de byte del evento. El número de desplazamiento o secuencia permite a un consumidor de eventos (lector) especificar un punto en el flujo de eventos desde el que desean comenzar a leer eventos. Puede especificar una marca de tiempo de modo que reciba eventos en cola solo después de la marca de tiempo especificada. Los consumidores son responsables de almacenar sus propios valores de desplazamiento fuera del servicio de Event Hubs. Dentro de una partición, cada evento incluye un desplazamiento, un número de secuencia y la marca de tiempo de cuando se puso en cola.

Ejemplos

Cree una clave privada RSA EventHubConsumerClient.

La manera más fácil de crear es EventHubConsumerClient usar una cadena de conexión.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Para ver otras formas de crear un EventHubConsumerClient, consulte la biblioteca de EventHubs para obtener más detalles.

Consumir eventos mediante un BlobCheckpointStore punto de control para realizar

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Uso BlobCheckpointStore con una versión diferente de la API del servicio Azure Storage

Algunos entornos tienen diferentes versiones de la API del servicio Azure Storage. BlobCheckpointStore de forma predeterminada usa la VERSIÓN 2019-07-07-07 de la API del servicio de almacenamiento. Para usarlo en otra versión, especifique api_version al crear el BlobCheckpointStore objeto .

Solución de problemas

General

Habilitar el registro será útil para solucionar problemas.

Registro

  • Habilite el azure.eventhub.extensions.checkpointstoreblobaio registrador para recopilar seguimientos de la biblioteca.
  • Habilite el azure.eventhub registrador para recopilar seguimientos de la biblioteca principal de azure-eventhub.
  • Habilite el azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage registrador para recopilar seguimientos de la biblioteca de blobs de Azure Storage.
  • Habilite el uamqp registrador para recopilar seguimientos de la biblioteca uAMQP subyacente.
  • Habilite el seguimiento del nivel de fotograma de AMQP estableciendo logging_enable=True al crear el cliente.

Pasos siguientes

Más código de ejemplo

Empiece a trabajar con nuestros ejemplos asincrónicos de EventHubs Checkpoint Store.

Documentación

La documentación de referencia está disponible aquí.

Envío de comentarios

Si encuentra algún error o tiene sugerencias, envíe un problema en la sección Problemas del proyecto.

Contribuciones

Este proyecto agradece las contribuciones y sugerencias. La mayoría de las contribuciones requieren que acepte un Contrato de licencia para el colaborador (CLA) que declara que tiene el derecho a concedernos y nos concede los derechos para usar su contribución. Para más detalles, visite https://cla.microsoft.com.

Cuando se envía una solicitud de incorporación de cambios, un bot de CLA determinará de forma automática si tiene que aportar un CLA y completar la PR adecuadamente (por ejemplo, la etiqueta, el comentario). Solo siga las instrucciones que le dará el bot. Solo será necesario que lo haga una vez en todos los repositorios con nuestro CLA.

Este proyecto ha adoptado el Código de conducta de Microsoft Open Source. Para más información, consulte las preguntas más frecuentes del código de conducta o póngase en contacto con opencode@microsoft.com si tiene cualquier otra pregunta o comentario.

Impresiones