Compartir a través de


biblioteca cliente de Azure Event Hubs para Python: versión 5.11.5

Azure Event Hubs es un servicio de publicación y suscripción altamente escalable que puede ingerir millones de eventos por segundo y transmitirlos a varios consumidores. Esto le permite procesar y analizar grandes cantidades de datos generados por los dispositivos y aplicaciones conectados. Una vez que Event Hubs ha recopilado los datos, puede recuperarlos, transformarlos y almacenarlos mediante cualquier proveedor de análisis en tiempo real o con adaptadores de almacenamiento o procesamiento por lotes. Si desea obtener más información sobre Azure Event Hubs, puede que desee revisar: ¿Qué es Event Hubs?

La biblioteca cliente de Azure Event Hubs permite publicar y consumir eventos de Azure Event Hubs y se puede usar para lo siguiente:

  • Emitir telemetría sobre la aplicación con fines de diagnóstico e inteligencia empresarial.
  • Publicar datos sobre el estado de la aplicación que las partes interesadas pueden consultar y usar como desencadenadores para tomar medidas.
  • Observar las operaciones e interacciones interesantes que se están produciendo en el negocio u otro ecosistema, lo que permite que los sistemas de acoplamiento flexible interactúen sin necesidad de enlazarlos.
  • Recibir eventos de uno o varios editores, transformarlos para satisfacer mejor las necesidades del ecosistema y, a continuación, publicar los eventos transformados en un nuevo flujo para que los consumidores los observen.

Código | fuentePaquete (PyPi) | Paquete (Conda) | Documentación | de referencia de APIDocumentación | del productoMuestras

Introducción

Requisitos previos

  • Python 3.7 o superior.

  • Suscripción de Microsoft Azure: Para usar los servicios de Azure, incluida Azure Event Hubs, necesitará una suscripción. Si no tiene una cuenta de Azure existente, puede registrarse para obtener una evaluación gratuita o usar las ventajas del suscriptor de MSDN al crear una cuenta.

  • Espacio de nombres de Event Hubs con un centro de eventos: Para interactuar con Azure Event Hubs, también deberá tener un espacio de nombres y un centro de eventos disponibles. Si no está familiarizado con la creación de recursos de Azure, puede seguir la guía paso a paso para crear un centro de eventos mediante el Azure Portal. Allí también puede encontrar instrucciones detalladas para usar la CLI de Azure, Azure PowerShell o plantillas de Azure Resource Manager (ARM) para crear un centro de eventos.

Instalar el paquete

Instale la biblioteca cliente de Azure Event Hubs para Python con pip:

$ pip install azure-eventhub

Autenticar el cliente

La interacción con Event Hubs comienza con una instancia de la clase EventHubConsumerClient o EventHubProducerClient. Necesita el nombre de host, la credencial de SAS/AAD y el nombre del centro de eventos o un cadena de conexión para crear instancias del objeto de cliente.

Cree un cliente a partir de cadena de conexión:

Para que la biblioteca cliente de Event Hubs interactúe con un centro de eventos, los medios más fáciles es usar un cadena de conexión, que se crea automáticamente al crear un espacio de nombres de Event Hubs. Si no está familiarizado con las directivas de acceso compartido en Azure, puede seguir la guía paso a paso para obtener un cadena de conexión de Event Hubs.

  • El from_connection_string método toma el cadena de conexión del formulario Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> y el nombre de entidad en la instancia del centro de eventos. Puede obtener la cadena de conexión de Azure Portal.

Cree un cliente mediante la biblioteca azure-identity:

Como alternativa, se puede usar un objeto Credential para autenticarse a través de AAD con el paquete azure-identity.

  • Este constructor mostrado en el ejemplo vinculado anteriormente toma el nombre de host y el nombre de entidad de la instancia del centro de eventos y las credenciales que implementan el protocolo TokenCredential . Hay implementaciones del TokenCredential protocolo disponibles en el paquete azure-identity. El nombre de host tiene el formato <yournamespace.servicebus.windows.net>.
  • Para usar los tipos de credenciales proporcionados por azure-identity, instale el paquete: pip install azure-identity
  • Además, para usar la API asincrónica, primero debe instalar un transporte asincrónico, como aiohttp: pip install aiohttp
  • Al usar Azure Active Directory, la entidad de seguridad debe tener asignado un rol que permita el acceso a Event Hubs, como el rol propietario de datos de Azure Event Hubs. Para más información sobre el uso de la autorización de Azure Active Directory con Event Hubs, consulte la documentación asociada.

Conceptos clave

  • EventHubProducerClient es un origen de datos de telemetría, información de diagnóstico, registros de uso u otros datos de registro, como parte de una solución de dispositivo insertada, una aplicación de dispositivo móvil, un título de juego que se ejecuta en una consola u otro dispositivo, alguna solución empresarial basada en cliente o servidor o un sitio web.

  • Un EventHubConsumerClient recoge dicha información del centro de eventos y la procesa. El procesamiento puede implicar la agregación, el cálculo complejo y el filtrado. El procesamiento también puede implicar la distribución o el almacenamiento de la información sin procesar o transformada. Los consumidores de centros de eventos suelen ser componentes sólidos y a gran escala de la infraestructura de la plataforma con funcionalidades de análisis integradas, como Azure Stream Analytics, Apache Spark o Apache Storm.

  • Una partición es una secuencia ordenada de eventos que se mantiene en un centro de eventos. Azure Event Hubs proporciona streaming de mensajes a través de un patrón de consumidor con particiones en el que cada consumidor solo lee un subconjunto específico, o partición, de la secuencia de mensajes. A medida que llegan eventos más recientes, se agregan al final de esta secuencia. El número de particiones se especifica en el momento en que se crea un centro de eventos y no se puede modificar.

  • Un grupo de consumidores es una vista de un centro de eventos completo. Los grupos de consumidores habilitan a varias aplicaciones de consumo para que cada una de ellas tenga una vista independiente de la secuencia de eventos, y para que lea la secuencia de manera independiente a su propio ritmo y desde su propia ubicación. Puede haber como máximo cinco lectores simultáneos en una partición por grupo de consumidores; sin embargo, se recomienda que solo haya un consumidor activo para un emparejamiento determinado de partición y grupo de consumidores. Cada lector activo recibe todos los eventos de su partición; si hay varios lectores en la misma partición, recibirán eventos duplicados.

Para obtener más conceptos y una explicación más detallada, consulte: Características de Event Hubs. Además, los conceptos de AMQP están bien documentados en OASIS Advanced Messaging Queuing Protocol (AMQP) Versión 1.0.

Seguridad para subprocesos

No garantizamos que EventHubProducerClient o EventHubConsumerClient sean seguros para subprocesos. No se recomienda reutilizar estas instancias entre subprocesos. La aplicación en ejecución puede usar estas clases de forma segura para subprocesos.

El tipo de modelo de datos, EventDataBatch no es seguro para subprocesos. No debe compartirse entre subprocesos ni usarse simultáneamente con métodos de cliente.

Ejemplos

En las secciones siguientes se proporcionan varios fragmentos de código que abarcan algunas de las tareas más comunes de Event Hubs, entre las que se incluyen:

Inspección de un centro de eventos

Obtenga los identificadores de partición de un centro de eventos.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Publicación de eventos en un centro de eventos

Use el create_batch método en EventHubProducerClient para crear un EventDataBatch objeto que se puede enviar mediante el send_batch método . Los eventos se pueden agregar al uso del EventDataBatchadd método hasta que se haya alcanzado el límite máximo de tamaño de lote en bytes.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Consumo de eventos de un centro de eventos

Hay varias maneras de consumir eventos desde un EventHub. Para desencadenar simplemente una devolución de llamada cuando se recibe un evento, el EventHubConsumerClient.receive método será de uso como se indica a continuación:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Consumo de eventos de un centro de eventos en lotes

Mientras que el ejemplo anterior desencadena la devolución de llamada para cada mensaje a medida que se recibe, el ejemplo siguiente desencadena la devolución de llamada en un lote de eventos, intentando recibir un número a la vez.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Publicar eventos en un centro de eventos de forma asincrónica

Use el create_batch método en EventHubProducer para crear un EventDataBatch objeto que se puede enviar mediante el send_batch método . Los eventos se pueden agregar al uso del EventDataBatchadd método hasta que se haya alcanzado el límite máximo de tamaño de lote en bytes.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Consumir eventos de un centro de eventos de forma asincrónica

Este SDK admite código sincrónico y basado en asyncio. Para recibir como se muestra en los ejemplos anteriores, pero dentro de aio, necesitaría lo siguiente:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Consumo de eventos de un centro de eventos en lotes de forma asincrónica

Todas las funciones sincrónicas también se admiten en aio. Como se ha mostrado anteriormente para la recepción por lotes sincrónica, se puede lograr lo mismo dentro de asyncio de la siguiente manera:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Consumo de eventos y guardado de puntos de control mediante un almacén de puntos de control

EventHubConsumerClient es una construcción de alto nivel que permite recibir eventos de varias particiones a la vez y equilibrar la carga con otros consumidores mediante el mismo centro de eventos y grupo de consumidores.

Esto también permite al usuario realizar un seguimiento del progreso cuando se procesan los eventos mediante puntos de control.

Un punto de control está pensado para representar el último evento procesado correctamente por el usuario desde una partición determinada de un grupo de consumidores en una instancia del centro de eventos. EventHubConsumerClient usa una instancia de para actualizar los puntos de CheckpointStore control y para almacenar la información pertinente requerida por el algoritmo de equilibrio de carga.

Busque pypi con el prefijo azure-eventhub-checkpointstore para buscar paquetes que lo admitan y use la CheckpointStore implementación de un paquete de este tipo. Tenga en cuenta que se proporcionan bibliotecas sincronizadas y asincrónicas.

En el ejemplo siguiente, se crea una instancia de EventHubConsumerClient y se usa .BlobCheckpointStore Debe crear una cuenta de Azure Storage y un contenedor de blobs para ejecutar el código.

Azure Blob Storage Async y Azure Blob Storage Checkpoint Store Sync son una de las CheckpointStore implementaciones que proporcionamos que se aplica Azure Blob Storage como almacén persistente.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Uso de EventHubConsumerClient para trabajar con IoT Hub

También puede usar EventHubConsumerClient para trabajar con IoT Hub. Esto es útil para recibir datos de telemetría de IoT Hub desde el centro de eventos vinculado. El cadena de conexión asociado no tendrá notificaciones de envío, por lo que no es posible enviar eventos.

Tenga en cuenta que el cadena de conexión debe ser para un punto de conexión compatible con Event Hubs, por ejemplo, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Hay dos maneras de obtener el punto de conexión compatible con Event Hubs:

  • Obtenga manualmente los "puntos de conexión integrados" del IoT Hub en Azure Portal y recibalos.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Solución de problemas

Consulte la azure-eventhubsguía de solución de problemas para obtener más información sobre cómo diagnosticar varios escenarios de error.

Pasos siguientes

Más código de ejemplo

Eche un vistazo al directorio de ejemplos para obtener ejemplos detallados de cómo usar esta biblioteca para enviar y recibir eventos hacia y desde Event Hubs.

Documentación

La documentación de referencia está disponible aquí.

Registro de esquemas y Codificador avro

El SDK de EventHubs se integra perfectamente con el servicio Registro de esquemas y Avro. Para obtener más información, consulte SDK del Registro de esquemas y SDK de Avro Encoder del registro de esquemas.

Compatibilidad con transporte y compatibilidad con versiones anteriores de AMQP de Pure Python

La biblioteca cliente de Azure Event Hubs ahora se basa en una implementación de AMQP de Python pura. uAMQP se ha quitado como dependencia necesaria.

Para usar uAMQP como transporte subyacente:

  1. Instale uamqp con pip.
$ pip install uamqp 
  1. Pase uamqp_transport=True durante la construcción del cliente.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Nota: El message atributo en EventData/EventDataBatch, que anteriormente expone , uamqp.Messageha quedado en desuso. Se han introducido los objetos "heredados" devueltos por EventData.message/EventDataBatch.message para facilitar la transición.

Creación de rueda uAMQP desde el origen

Si uAMQP está pensado para usarse como la implementación subyacente del protocolo AMQP para azure-eventhub, se pueden encontrar ruedas uAMQP para la mayoría de los sistemas operativos principales.

Si piensa usar uAMQP y se ejecuta en una plataforma para la que no se proporcionan ruedas uAMQP, siga las instrucciones de instalación de uAMQP para instalar desde el origen.

Envío de comentarios

Si encuentra algún error o tiene sugerencias, envíe un problema en la sección Problemas del proyecto.

Contribuciones

Este proyecto agradece las contribuciones y sugerencias. La mayoría de las contribuciones requieren que acepte un Contrato de licencia para el colaborador (CLA) que declara que tiene el derecho a concedernos y nos concede los derechos para usar su contribución. Para más detalles, visite https://cla.microsoft.com.

Cuando se envía una solicitud de incorporación de cambios, un bot de CLA determinará de forma automática si tiene que aportar un CLA y completar la PR adecuadamente (por ejemplo, la etiqueta, el comentario). Solo siga las instrucciones que le dará el bot. Solo será necesario que lo haga una vez en todos los repositorios con nuestro CLA.

Este proyecto ha adoptado el Código de conducta de Microsoft Open Source. Para más información, consulte las preguntas más frecuentes del código de conducta o póngase en contacto con opencode@microsoft.com si tiene cualquier otra pregunta o comentario.

Impresiones