Partager via


Bibliothèque cliente Azure Schema Registry Avro Encoder pour Python - version 1.0.0

Azure Schema Registry est un service de référentiel de schémas hébergé par Azure Event Hubs, qui fournit le stockage des schémas, le contrôle de version et la gestion. Ce package fournit un encodeur Avro capable d’encoder et de décoder des charges utiles contenant des identificateurs de schéma du registre de schémas et du contenu encodé avro.

| Code sourcePackage (PyPi) | Documentation de référence sur les | API Échantillons | Changelog

Clause d’exclusion de responsabilité

La prise en charge des packages Python du SDK Azure pour Python 2.7 a pris fin le 1er janvier 2022. Pour obtenir plus d’informations et poser des questions, reportez-vous à https://github.com/Azure/azure-sdk-for-python/issues/20691

Prise en main

Installer le package

Installez la bibliothèque cliente Azure Schema Registry Avro Encoder pour Python avec pip :

pip install azure-schemaregistry-avroencoder

Configuration requise :

Pour utiliser ce package, vous devez disposer des éléments suivants :

Authentifier le client

L’interaction avec l’encodeur Avro Registry de schémas commence par une instance de la classe AvroEncoder, qui prend le nom du groupe de schémas et la classe cliente Schema Registry . Le constructeur client prend l’espace de noms complet Event Hubs et les informations d’identification Azure Active Directory :

  • L’espace de noms complet de l’instance du Registre de schémas doit suivre le format : <yournamespace>.servicebus.windows.net.

  • Les informations d’identification AAD qui implémentent le protocole TokenCredential doivent être passées au constructeur. Des implémentations du TokenCredential protocole sont disponibles dans le package azure-identity. Pour utiliser les types d’informations d’identification fournis par azure-identity, installez la bibliothèque cliente Azure Identity pour Python avec pip :

pip install azure-identity
  • En outre, pour utiliser l’API asynchrone, vous devez d’abord installer un transport asynchrone, tel que aiohttp :
pip install aiohttp

Créez AvroEncoder à l’aide de la bibliothèque azure-schemaregistry :

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Concepts clés

AvroEncoder

Fournit une API pour encoder et décoder à partir d’Avro Binary Encoding plus un type de contenu avec l’ID de schéma. Utilise SchemaRegistryClient pour obtenir des ID de schéma à partir du contenu du schéma ou inversement.

Modèles de message pris en charge

La prise en charge a été ajoutée à certaines classes de modèles du KIT de développement logiciel (SDK) Azure Messaging pour l’interopérabilité avec .AvroEncoder Ces modèles sont des sous-types du MessageType protocole défini sous l’espace de azure.schemaregistry.encoder.avroencoder noms. Actuellement, les classes de modèle prises en charge sont les suivantes :

  • azure.eventhub.EventData pour azure-eventhub>=5.9.0

Format de message

Si un type de message qui suit le protocole MessageType est fourni à l’encodeur pour l’encodage, il définit les propriétés de contenu et de type de contenu correspondantes, où :

  • content: charge utile Avro (en général, charge utile spécifique au format)

    • Encodage binaire Avro
    • NOT Avro Object Container File, qui inclut le schéma et contredise l’objectif de cet encodeur de déplacer le schéma hors de la charge utile du message et dans le registre de schémas.
  • content type: chaîne du format avro/binary+<schema ID>, où :

    • avro/binary est l’indicateur de format
    • <schema ID> est la représentation hexadécimale du GUID, le même format et l’ordre d’octet que la chaîne du service Registre de schémas.

Si EventData est transmis en tant que type de message, les propriétés suivantes sont définies sur l’objet EventData :

  • La body propriété sera définie sur la valeur de contenu.
  • La content_type propriété sera définie sur la valeur du type de contenu.

Si le type de message n’est pas fourni et par défaut, l’encodeur crée la dict suivante : {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Exemples

Les sections suivantes fournissent plusieurs extraits de code couvrant certaines des tâches les plus courantes du Registre de schémas, notamment :

Encodage

Utilisez la AvroEncoder.encode méthode pour encoder du contenu avec le schéma Avro donné. La méthode utilise un schéma précédemment inscrit auprès du service Registre de schémas et conserve le schéma mis en cache pour une utilisation future de l’encodage. Pour éviter de préinscrire le schéma auprès du service et de l’inscrire automatiquement auprès de la encode méthode, l’argument auto_register=True de mot clé doit être passé au AvroEncoder constructeur.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Décodage

Utilisez la AvroEncoder.decode méthode pour décoder le contenu encodé avro par :

  • Passage d’un objet de message qui est un sous-type du protocole MessageType.
  • Passage d’une dict avec des clés content(type octets) et content_type (type string). La méthode récupère automatiquement le schéma à partir du service De Registre de schémas et conserve le schéma mis en cache pour une utilisation ultérieure du décodage.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Intégration d’envoi d’Event Hubs

Intégration à Event Hubs pour envoyer un EventData objet avec body défini sur le contenu encodé avro et correspondant content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Intégration de réception d’Event Hubs

Intégration à Event Hubs pour recevoir un EventData objet et décoder la valeur encodée body avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Dépannage

Général

Azure Schema Registry Avro Encoder déclenche des exceptions définies dans Azure Core si des erreurs sont rencontrées lors de la communication avec le service Schema Registry. Les erreurs liées aux types de contenu/contenu non valides et aux schémas non valides seront générées en tant que azure.schemaregistry.encoder.avroencoder.InvalidContentError et azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, respectivement, où __cause__ contiendra l’exception sous-jacente levée par la bibliothèque Apache Avro.

Journalisation

Cette bibliothèque utilise la bibliothèque de journalisation standard pour la journalisation. Les informations de base sur les sessions HTTP (URL, en-têtes, etc.) sont enregistrées au niveau INFO.

La journalisation détaillée au niveau DEBUG, y compris les corps de requête/réponse et les en-têtes non expurgés, peut être activée sur un client avec l’argument logging_enable :

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

De la même façon, logging_enable peut activer la journalisation détaillée pour une seule opération, même quand elle n’est pas activée pour le client :

encoder.encode(dict_content, schema=definition, logging_enable=True)

Étapes suivantes

Autres exemples de code

D’autres exemples illustrant les scénarios Courants d’Azure Schema Registry Avro Encoder se trouvent dans le répertoire d’exemples .

Contribution

Ce projet accepte les contributions et les suggestions. La plupart des contributions vous demandent d’accepter un contrat de licence de contribution (CLA) déclarant que vous avez le droit de nous accorder, et que vous nous accordez réellement, les droits d’utilisation de votre contribution. Pour plus d’informations, visitez https://cla.microsoft.com.

Quand vous envoyez une demande de tirage (pull request), un bot CLA détermine automatiquement si vous devez fournir un contrat CLA et agrémenter la demande de tirage de façon appropriée (par exemple, avec une étiquette ou un commentaire). Suivez simplement les instructions fournies par le bot. Vous ne devez effectuer cette opération qu’une seule fois sur tous les dépôts utilisant notre contrat CLA.

Ce projet a adopté le Code de conduite Open Source de Microsoft. Pour plus d’informations, consultez les Questions fréquentes (FAQ) sur le code de conduite ou envoyez vos questions ou vos commentaires à opencode@microsoft.com.