Compartir a través de


Biblioteca cliente de Avro Encoder del Registro de esquema de Azure para Python: versión 1.0.0

Azure Schema Registry es un servicio de repositorio de esquemas hospedado por Azure Event Hubs, lo que proporciona almacenamiento de esquemas, control de versiones y administración. Este paquete proporciona un codificador Avro capaz de codificar y descodificar cargas que contienen identificadores de esquema del Registro de esquema y contenido codificado en Avro.

Código | fuente Paquete (PyPi) | Documentación | de referencia de APIMuestras | Changelog

Declinación de responsabilidades

Los paquetes de Python del SDK de Azure para Python 2.7 finalizaron el 01 de enero de 2022. Para más información y preguntas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691.

Introducción

Instalar el paquete

Instale la biblioteca cliente de Avro Encoder del Registro de esquemas de Azure para Python con pip:

pip install azure-schemaregistry-avroencoder

Requisitos previos:

Para usar este paquete, debe tener:

Autenticar el cliente

La interacción con el codificador Avro del Registro de esquemas comienza con una instancia de la clase AvroEncoder, que toma el nombre del grupo de esquemas y la clase Client del Registro de esquema. El constructor de cliente toma el espacio de nombres completo de Event Hubs y las credenciales de Azure Active Directory:

  • El espacio de nombres completo de la instancia del Registro de esquema debe seguir el formato : <yournamespace>.servicebus.windows.net.

  • Se debe pasar una credencial de AAD que implemente el protocolo TokenCredential al constructor. Hay implementaciones del TokenCredential protocolo disponibles en el paquete azure-identity. Para usar los tipos de credenciales proporcionados por azure-identity, instale la biblioteca cliente de Identidad de Azure para Python con pip:

pip install azure-identity
  • Además, para usar la API asincrónica, primero debe instalar un transporte asincrónico, como aiohttp:
pip install aiohttp

Cree AvroEncoder mediante la biblioteca azure-schemaregistry:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Conceptos clave

AvroEncoder

Proporciona api para codificar y descodificar de Avro Binary Encoding más un tipo de contenido con el identificador de esquema. Usa SchemaRegistryClient para obtener identificadores de esquema del contenido del esquema o viceversa.

Modelos de mensajes admitidos

Se ha agregado compatibilidad a determinadas clases de modelo del SDK de mensajería de Azure para la interoperabilidad con AvroEncoder. Estos modelos son subtipos del MessageType protocolo definido en el azure.schemaregistry.encoder.avroencoder espacio de nombres . Actualmente, las clases de modelo admitidas son:

  • azure.eventhub.EventData para azure-eventhub>=5.9.0

Formato de los mensajes

Si se proporciona un tipo de mensaje que sigue al protocolo MessageType al codificador para la codificación, establecerá las propiedades de contenido y tipo de contenido correspondientes, donde:

  • content: carga útil de Avro (en general, carga específica del formato)

    • Codificación binaria de Avro
    • NOT Avro Object Container File, que incluye el esquema y derrota el propósito de este codificador para mover el esquema fuera de la carga del mensaje y al registro de esquema.
  • content type: una cadena del formato avro/binary+<schema ID>, donde:

    • avro/binary es el indicador de formato.
    • <schema ID> es la representación hexadecimal del GUID, el mismo formato y orden de bytes que la cadena del servicio Registro de esquemas.

Si EventData se pasa como tipo de mensaje, se establecerán las siguientes propiedades en el EventData objeto :

  • La body propiedad se establecerá en el valor de contenido.
  • La content_type propiedad se establecerá en el valor del tipo de contenido.

Si no se proporciona el tipo de mensaje y, de forma predeterminada, el codificador creará el siguiente dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Ejemplos

En las secciones siguientes se proporcionan varios fragmentos de código que abarcan algunas de las tareas más comunes del Registro de esquemas, entre las que se incluyen:

Encoding

Use el AvroEncoder.encode método para codificar el contenido con el esquema avro especificado. El método usará un esquema registrado previamente en el servicio Registro de esquemas y mantendrá el esquema almacenado en caché para un uso de codificación futuro. Para evitar el registro previo del esquema en el servicio y registrarlo automáticamente con el método , el argumento auto_register=True de encode palabra clave debe pasarse al AvroEncoder constructor.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Descodificación

Use el AvroEncoder.decode método para descodificar el contenido codificado en Avro mediante:

  • Pasar un objeto de mensaje que es un subtipo del protocolo MessageType.
  • Pasar un dict con claves content(bytes de tipo) y content_type (cadena de tipo). El método recupera automáticamente el esquema del servicio registro de esquemas y mantiene el esquema almacenado en caché para un uso de descodificación futuro.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Integración de envío de Event Hubs

Integración con Event Hubs para enviar un EventData objeto con body establecido en contenido codificado en Avro y correspondiente content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Integración de recepción de Event Hubs

Integración con Event Hubs para recibir un EventData objeto y descodificar el valor codificado body en Avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Solución de problemas

General

El codificador Avro del Registro de esquemas de Azure genera excepciones definidas en Azure Core si se producen errores al comunicarse con el servicio Registro de esquemas. Los errores relacionados con los tipos de contenido o contenido no válidos y los esquemas no válidos se generarán como azure.schemaregistry.encoder.avroencoder.InvalidContentError y azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, respectivamente, donde __cause__ contendrá la excepción subyacente generada por la biblioteca de Apache Avro.

Registro

Esta biblioteca usa la biblioteca de registro estándar para el registro. La información básica sobre las sesiones HTTP (direcciones URL, encabezados, etc.) se registra en el nivel INFO.

El registro detallado de nivel DEBUG, incluidos los cuerpos de solicitud/respuesta y los encabezados no aprobados, se puede habilitar en un cliente con el logging_enable argumento :

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Igualmente, logging_enable puede habilitar el registro detallado de una sola operación, aunque no esté habilitado para el cliente:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Pasos siguientes

Más código de ejemplo

Otros ejemplos que muestran escenarios comunes de Avro Encoder del Registro de esquemas de Azure se encuentran en el directorio de ejemplos .

Contribuciones

Este proyecto agradece las contribuciones y sugerencias. La mayoría de las contribuciones requieren que acepte un Contrato de licencia para el colaborador (CLA) que declara que tiene el derecho a concedernos y nos concede los derechos para usar su contribución. Para más detalles, visite https://cla.microsoft.com.

Cuando se envía una solicitud de incorporación de cambios, un bot de CLA determinará de forma automática si tiene que aportar un CLA y completar la PR adecuadamente (por ejemplo, la etiqueta, el comentario). Solo siga las instrucciones que le dará el bot. Solo será necesario que lo haga una vez en todos los repositorios con nuestro CLA.

Este proyecto ha adoptado el Código de conducta de Microsoft Open Source. Para más información, consulte las preguntas más frecuentes del código de conducta o póngase en contacto con opencode@microsoft.com si tiene cualquier otra pregunta o comentario.