Biblioteca cliente del Almacén de puntos de comprobación de Azure EventHubs para Python: versión 1.1.4
uso de blobs de almacenamiento
El almacén de puntos de control de Azure EventHubs se usa para almacenar puntos de control durante el procesamiento de eventos de Azure Event Hubs.
Este paquete de almacén de puntos de control funciona como un paquete de complemento en EventHubConsumerClient
. Usa El blob de Azure Storage como almacén persistente para mantener los puntos de control y la información de propiedad de la partición.
Tenga en cuenta que se trata de una biblioteca asincrónica, para la versión de sincronización de la biblioteca cliente del Almacén de puntos de comprobación de Azure EventHubs, consulte azure-eventhub-checkpointstoreblob.
Código | fuente Paquete (PyPi) | Documentación | de referencia de API Documentación | de Azure EventHubsDocumentación de Azure Storage
Introducción
Requisitos previos
Python 3.6 o versiones posteriores.
Suscripción de Microsoft Azure: Para usar los servicios de Azure, incluida Azure Event Hubs, necesitará una suscripción. Si no tiene una cuenta de Azure existente, puede registrarse para obtener una evaluación gratuita o usar las ventajas del suscriptor de MSDN al crear una cuenta.
Espacio de nombres de Event Hubs con un centro de eventos: Para interactuar con Azure Event Hubs, también deberá tener un espacio de nombres y un centro de eventos disponibles. Si no está familiarizado con la creación de recursos de Azure, puede seguir la guía paso a paso para crear un centro de eventos mediante el Azure Portal. Allí también puede encontrar instrucciones detalladas para usar la CLI de Azure, Azure PowerShell o plantillas de Azure Resource Manager (ARM) para crear un centro de eventos.
Cuenta de Azure Storage: Tendrá que tener una cuenta de Azure Storage y crear un contenedor de bloques de Azure Blob Storage para almacenar los datos del punto de control con blobs. Puede seguir la guía para crear una cuenta de Azure Block Blob Storage.
Instalar el paquete
$ pip install azure-eventhub-checkpointstoreblob-aio
Conceptos clave
Puntos de control
Puntos de control es un proceso en el que los lectores marcan o confirman su posición dentro de la secuencia de eventos de una partición. La creación de puntos de comprobación es responsabilidad del consumidor y se realiza por partición dentro de un grupo de consumidores. Esta responsaibilidad significa que por cada grupo de consumidores, cada lector de la partición debe realizar un seguimiento de su posición actual en el flujo del evento y puede informar al servicio cuando considere que el flujo de datos se ha completado. Si se desconecta un lector de una partición, cuando se vuelve a conectar comienza a leer en el punto de comprobación que envió previamente el último lector de esa partición en ese grupo de consumidores. Cuando se conecta el lector, pasa este desplazamiento al centro de eventos para especificar la ubicación en la que se va a empezar a leer. De este modo, puede usar puntos de comprobación para marcar eventos como "completados" por las aplicaciones de bajada y para ofrecer resistencia en caso de que se produzca una conmutación por error entre lectores que se ejecutan en máquinas distintas. Es posible volver a los datos más antiguos especificando un desplazamiento inferior desde este proceso de puntos de comprobación. Mediante este mecanismo, los puntos de comprobación permiten una resistencia a la conmutación por error y una reproducción del flujo de eventos.
Desplazamientos de & números de secuencia
Ambos números de secuencia de desplazamiento & hacen referencia a la posición de un evento dentro de una partición. Puede considerarlos como un cursor del lado cliente. El desplazamiento es una numeración de byte del evento. El número de desplazamiento o secuencia permite a un consumidor de eventos (lector) especificar un punto en el flujo de eventos desde el que desean comenzar a leer eventos. Puede especificar una marca de tiempo de modo que reciba eventos en cola solo después de la marca de tiempo especificada. Los consumidores son responsables de almacenar sus propios valores de desplazamiento fuera del servicio de Event Hubs. Dentro de una partición, cada evento incluye un desplazamiento, un número de secuencia y la marca de tiempo de cuando se puso en cola.
Ejemplos
- Creación de una instancia de Azure EventHubs
EventHubConsumerClient
- Consumo de eventos mediante un
BlobCheckpointStore
Cree una clave privada RSA EventHubConsumerClient
.
La manera más fácil de crear es EventHubConsumerClient
usar una cadena de conexión.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Para ver otras formas de crear un EventHubConsumerClient
, consulte la biblioteca de EventHubs para obtener más detalles.
Consumir eventos mediante un BlobCheckpointStore
punto de control para realizar
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Uso BlobCheckpointStore
con una versión diferente de la API del servicio Azure Storage
Algunos entornos tienen diferentes versiones de la API del servicio Azure Storage.
BlobCheckpointStore
de forma predeterminada usa la VERSIÓN 2019-07-07-07 de la API del servicio de almacenamiento. Para usarlo en otra versión, especifique api_version
al crear el BlobCheckpointStore
objeto .
Solución de problemas
General
Habilitar el registro será útil para solucionar problemas.
Registro
- Habilite el
azure.eventhub.extensions.checkpointstoreblobaio
registrador para recopilar seguimientos de la biblioteca. - Habilite el
azure.eventhub
registrador para recopilar seguimientos de la biblioteca principal de azure-eventhub. - Habilite el
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
registrador para recopilar seguimientos de la biblioteca de blobs de Azure Storage. - Habilite el
uamqp
registrador para recopilar seguimientos de la biblioteca uAMQP subyacente. - Habilite el seguimiento del nivel de fotograma de AMQP estableciendo
logging_enable=True
al crear el cliente.
Pasos siguientes
Más código de ejemplo
Empiece a trabajar con nuestros ejemplos asincrónicos de EventHubs Checkpoint Store.
- receive_events_using_checkpoint_store_async.py : ejemplo de EventHubConsumerClient con el almacén de puntos de comprobación de blobs
- receive_events_using_checkpoint_store_storage_api_version_async.py : EventHubConsumerClient con el almacén de puntos de control de blobs y el ejemplo de versión de almacenamiento
Documentación
La documentación de referencia está disponible aquí.
Envío de comentarios
Si encuentra algún error o tiene sugerencias, envíe un problema en la sección Problemas del proyecto.
Contribuciones
Este proyecto agradece las contribuciones y sugerencias. La mayoría de las contribuciones requieren que acepte un Contrato de licencia para el colaborador (CLA) que declara que tiene el derecho a concedernos y nos concede los derechos para usar su contribución. Para más detalles, visite https://cla.microsoft.com.
Cuando se envía una solicitud de incorporación de cambios, un bot de CLA determinará de forma automática si tiene que aportar un CLA y completar la PR adecuadamente (por ejemplo, la etiqueta, el comentario). Solo siga las instrucciones que le dará el bot. Solo será necesario que lo haga una vez en todos los repositorios con nuestro CLA.
Este proyecto ha adoptado el Código de conducta de Microsoft Open Source. Para más información, consulte las preguntas más frecuentes del código de conducta o póngase en contacto con opencode@microsoft.com si tiene cualquier otra pregunta o comentario.
Azure SDK for Python