Bibliothèque de client Avro Serializer pour Python - version 1.0.0b4 d’Azure Schema Registry
Azure Schema Registry est un service de référentiel de schémas hébergé par Azure Event Hubs, qui fournit le stockage des schémas, le contrôle de version et la gestion. Ce package fournit un sérialiseur Avro capable de sérialiser et de désérialiser des charges utiles contenant des identificateurs de schéma du registre de schémas et des données encodées par Avro.
| Code sourcePackage (PyPi) | Documentation de référence sur les | API Échantillons | Changelog
La prise en charge des packages Python du SDK Azure pour Python 2.7 prend fin le 1er janvier 2022. Pour obtenir plus d’informations et poser des questions, reportez-vous à https://github.com/Azure/azure-sdk-for-python/issues/20691
Installez la bibliothèque de client Azure Schema Registry Avro Serializer et la bibliothèque de client Azure Identity pour Python avec pip :
pip install azure-schemaregistry-avroserializer azure-identity
Pour utiliser ce package, vous devez disposer des conditions suivantes :
- Abonnement Azure - Créer un compte gratuit
- Azure Schema Registry
- Python 2.7, 3.6 ou version ultérieure - Installer Python
L’interaction avec le sérialiseur Avro Du registre de schémas commence par une instance de la classe AvroSerializer, qui prend le nom du groupe de schémas et la classe cliente Schema Registry . Le constructeur client prend l’espace de noms complet Event Hubs et les informations d’identification Azure Active Directory :
L’espace de noms complet de l’instance de Registre de schémas doit suivre le format :
<yournamespace>.servicebus.windows.net
.Les informations d’identification AAD qui implémentent le protocole TokenCredential doivent être passées au constructeur. Des implémentations du
TokenCredential
protocole sont disponibles dans le package azure-identity. Pour utiliser les types d’informations d’identification fournis parazure-identity
, installez la bibliothèque de client Azure Identity pour Python avec pip :
pip install azure-identity
- En outre, pour utiliser l’API asynchrone prise en charge sur Python 3.6+, vous devez d’abord installer un transport asynchrone, tel que aiohttp :
pip install aiohttp
Créez AvroSerializer à l’aide de la bibliothèque azure-schemaregistry :
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
Fournit une API pour sérialiser et désérialiser à partir d’Avro Binary Encoding, ainsi qu’un en-tête avec l’ID de schéma. Utilise SchemaRegistryClient pour obtenir des ID de schéma à partir du contenu du schéma ou vice versa.
Le même format est utilisé par les sérialiseurs de registre de schémas dans les langages du Kit de développement logiciel (SDK) Azure.
Les messages sont encodés comme suit :
4 octets : Indicateur de format
- Actuellement, toujours zéro pour indiquer le format ci-dessous.
32 octets : ID de schéma
- Représentation hexadécimale UTF-8 du GUID.
- 32 chiffres hexadécimaux, pas de traits d’union.
- Même format et ordre d’octets que la chaîne du service Registre de schémas.
Octets restants : charge utile Avro (en général, charge utile spécifique au format)
- Encodage binaire Avro
- NOT Avro Object Container File, qui inclut le schéma et va à l’encontre de l’objectif de ce serialzer de déplacer le schéma hors de la charge utile de message et dans le registre de schémas.
Les sections suivantes fournissent plusieurs extraits de code couvrant certaines des tâches les plus courantes du Registre de schémas, notamment :
- Sérialisation
- Désérialisation
- Intégration d’envoi d’Event Hubs
- Intégration de réception d’Event Hubs
Utilisez AvroSerializer.serialize
la méthode pour sérialiser des données de dict avec le schéma avro donné.
La méthode utilise un schéma précédemment inscrit auprès du service Registre de schémas et conserve le schéma mis en cache pour une utilisation ultérieure de la sérialisation. Il est également possible d’éviter de préinscrire le schéma auprès du service et de l’inscrire automatiquement avec la serialize
méthode en instanciant avec AvroSerializer
l’argument auto_register_schemas=True
de mot clé .
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema=definition)
Utilisez AvroSerializer.deserialize
la méthode pour désérialiser des octets bruts dans des données de dictée.
La méthode récupère automatiquement le schéma à partir du service de registre de schémas et conserve le schéma mis en cache pour une utilisation ultérieure de la désérialisation.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
Intégration à Event Hubs pour envoyer des données avro dict sérialisées en tant que corps d’EventData.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
Intégration à Event Hubs pour recevoir EventData
et désérialiser des octets bruts dans des données de dictée avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Azure Schema Registry Avro Serializer déclenche des exceptions définies dans Azure Core.
Cette bibliothèque utilise la bibliothèque de journalisation standard pour la journalisation. Les informations de base sur les sessions HTTP (URL, en-têtes, etc.) sont enregistrées au niveau INFO.
La journalisation détaillée au niveau DEBUG, y compris les corps de requête/réponse et les en-têtes non expurgés, peut être activée sur un client avec l’argument logging_enable
:
import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")
De la même façon, logging_enable
peut activer la journalisation détaillée pour une seule opération, même quand elle n’est pas activée pour le client :
serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)
Vous trouverez d’autres exemples dans le répertoire d’exemples illustrant les scénarios courants du registre de schémas Azure Avro Serializer.
Ce projet accepte les contributions et les suggestions. La plupart des contributions vous demandent d’accepter un contrat de licence de contribution (CLA) déclarant que vous avez le droit de nous accorder, et que vous nous accordez réellement, les droits d’utilisation de votre contribution. Pour plus d’informations, visitez https://cla.microsoft.com.
Quand vous envoyez une demande de tirage (pull request), un bot CLA détermine automatiquement si vous devez fournir un contrat CLA et agrémenter la demande de tirage de façon appropriée (par exemple, avec une étiquette ou un commentaire). Suivez simplement les instructions fournies par le bot. Vous ne devez effectuer cette opération qu’une seule fois sur tous les dépôts utilisant notre contrat CLA.
Ce projet a adopté le Code de conduite Open Source de Microsoft. Pour plus d’informations, consultez les Questions fréquentes (FAQ) sur le code de conduite ou envoyez vos questions ou vos commentaires à opencode@microsoft.com.
Commentaires sur Azure SDK for Python
Azure SDK for Python est un projet open source. Sélectionnez un lien pour fournir des commentaires :