Azure Schema Registry Avro Encoder-Clientbibliothek für Python – Version 1.0.0
Azure Schema Registry ist ein von Azure Event Hubs gehosteter Schemarepositorydienst, der Schemaspeicher, Versionsverwaltung und -verwaltung bereitstellt. Dieses Paket stellt einen Avro-Encoder bereit, der Nutzdaten codieren und decodieren kann, die Schemaregistrierungsschemabezeichner und Avro-codierte Inhalte enthalten.
Quellcode | Paket (PyPi) | API-Referenzdokumentation | Proben | Changelog
Haftungsausschluss
Die Unterstützung von Python-Paketen für Das Azure SDK für Python 2.7 wurde am 01. Januar 2022 eingestellt. Weitere Informationen und Antworten finden Sie unter https://github.com/Azure/azure-sdk-for-python/issues/20691.
Erste Schritte
Installieren des Pakets
Installieren Sie die Azure Schema Registry Avro Encoder-Clientbibliothek für Python mit pip:
pip install azure-schemaregistry-avroencoder
Voraussetzungen:
Um dieses Paket verwenden zu können, benötigen Sie Folgendes:
- Azure-Abonnement (Erstellen Sie ein kostenloses Konto.)
- Azure Schema Registry - Hier finden Sie die Schnellstartanleitung zum Erstellen einer Schemaregistrierungsgruppe mit dem Azure-Portal.
- Python 3.6 oder höher: Installieren von Python
Authentifizieren des Clients
Die Interaktion mit dem Avro Encoder der Schemaregistrierung beginnt mit einer Instanz der AvroEncoder-Klasse, die den Schemagruppennamen und die Schemaregistrierungsclientklasse übernimmt. Der Clientkonstruktor übernimmt den vollqualifizierten Event Hubs-Namespace und Azure Active Directory-Anmeldeinformationen:
Der vollqualifizierte Namespace der Schemaregistrierungsinstanz sollte das folgende Format aufweisen:
<yournamespace>.servicebus.windows.net
.AAD-Anmeldeinformationen, die das TokenCredential-Protokoll implementieren, sollten an den Konstruktor übergeben werden. Im Paket azure-identity sind Implementierungen des
TokenCredential
Protokolls verfügbar. Um die vonazure-identity
bereitgestellten Anmeldeinformationstypen zu verwenden, installieren Sie die Azure Identity-Clientbibliothek für Python mit pip:
pip install azure-identity
- Außerdem müssen Sie für die Verwendung der asynchronen API zunächst einen asynchronen Transport installieren, z. B. aiohttp:
pip install aiohttp
Erstellen Sie AvroEncoder mithilfe der Bibliothek azure-schemaregistry:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Wichtige Begriffe
AvroEncoder
Stellt eine API zum Codieren und Decodieren von Avro Binary Encoding sowie einen Inhaltstyp mit Schema-ID bereit. Verwendet SchemaRegistryClient , um Schema-IDs aus Schemainhalten abzurufen oder umgekehrt.
Unterstützte Nachrichtenmodelle
Unterstützung für bestimmte Azure Messaging SDK-Modellklassen wurde für die AvroEncoder
Interoperabilität mit hinzugefügt. Bei diesen Modellen handelt es sich um Untertypen des MessageType
unter dem azure.schemaregistry.encoder.avroencoder
Namespace definierten Protokolls. Derzeit werden folgende Modellklassen unterstützt:
azure.eventhub.EventData
fürazure-eventhub>=5.9.0
Nachrichtenformat
Wenn ein Nachrichtentyp, der dem MessageType-Protokoll folgt, dem Encoder für die Codierung bereitgestellt wird, werden die entsprechenden Inhalts- und Inhaltstypeigenschaften festgelegt, wobei Folgendes gilt:
content
: Avro-Nutzlast (im Allgemeinen formatspezifische Nutzlast)- Avro Binärcodierung
- NICHT Avro-Objektcontainerdatei, die das Schema enthält und den Zweck dieses Encoders verfehlt, das Schema aus der Nachrichtennutzlast in die Schemaregistrierung zu verschieben.
content type
: eine Zeichenfolge im Formatavro/binary+<schema ID>
, wobei:avro/binary
ist der Formatindikator<schema ID>
ist die hexadezimale Darstellung der GUID, gleiches Format und die gleiche Bytereihenfolge wie die Zeichenfolge aus dem Schemaregistrierungsdienst.
Wenn EventData
als Nachrichtentyp übergeben wird, werden die folgenden Eigenschaften für das EventData
-Objekt festgelegt:
- Die
body
-Eigenschaft wird auf den Inhaltswert festgelegt. - Die
content_type
-Eigenschaft wird auf den Inhaltstypwert festgelegt.
Wenn der Nachrichtentyp nicht angegeben ist, erstellt der Encoder standardmäßig das folgende Dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Beispiele
Die folgenden Abschnitte enthalten mehrere Codeausschnitte, die einige der am häufigsten verwendeten Schemaregistrierungsaufgaben behandeln, einschließlich:
Codierung
Verwenden Sie die AvroEncoder.encode
-Methode, um Inhalte mit dem angegebenen Avro-Schema zu codieren.
Die -Methode verwendet ein Schema, das zuvor beim Schemaregistrierungsdienst registriert wurde, und behält das Schema für die zukünftige Codierungsverwendung zwischengespeichert. Um eine Vorregistrierung des Schemas beim Dienst zu vermeiden und es automatisch bei der encode
-Methode zu registrieren, sollte das Schlüsselwortargument auto_register=True
an den AvroEncoder
Konstruktor übergeben werden.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Decodierung
Verwenden Sie die AvroEncoder.decode
-Methode, um den Avro-codierten Inhalt durch einen der folgenden Methoden zu decodieren:
- Übergeben eines Nachrichtenobjekts, das ein Untertyp des MessageType-Protokolls ist.
- Übergeben eines Dikts mit Schlüsseln
content
(Typ bytes) undcontent_type
(Typzeichenfolge). Die -Methode ruft das Schema automatisch aus dem Schemaregistrierungsdienst ab und speichert das Schema für die zukünftige Decodierungsverwendung zwischen.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Event Hubs-Sendeintegration
Integration mit Event Hubs zum Senden eines EventData
Objekts mit body
festgelegtem Avro-codierten Inhalt und entsprechenden content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Event Hubs, die Integration empfangen
Integration mit Event Hubs , um ein EventData
Objekt zu empfangen und den Avro-codierten Wert zu decodieren body
.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Problembehandlung
Allgemein
Azure Schema Registry Avro Encoder löst in Azure Core definierte Ausnahmen aus, wenn bei der Kommunikation mit dem Schemaregistrierungsdienst Fehler auftreten. Fehler im Zusammenhang mit ungültigen Inhaltstypen und ungültigen Schemas werden als azure.schemaregistry.encoder.avroencoder.InvalidContentError
bzw azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
. ausgelöst, wobei __cause__
die zugrunde liegende Ausnahme enthalten wird, die von der Apache Avro-Bibliothek ausgelöst wird.
Protokollierung
Diese Bibliothek verwendet die Standardprotokollierungsbibliothek für die Protokollierung. Grundlegende Informationen zu HTTP-Sitzungen (URLs, Header usw.) werden auf INFO-Ebene protokolliert.
Eine detaillierte Protokollierung auf DEBUG-Ebene, einschließlich Anforderungs-/Antworttexten und nicht ausgeführten Headern, kann auf einem Client mit dem logging_enable
Argument aktiviert werden:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Ebenso kann über logging_enable
die ausführliche Protokollierung für einen einzelnen Vorgang aktiviert werden, auch wenn diese Funktion für den Client nicht aktiviert ist:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Nächste Schritte
Weiterer Beispielcode
Weitere Beispiele, die gängige Szenarien für Azure Schema Registry Avro Encoder veranschaulichen, finden Sie im Verzeichnis beispiele .
Mitwirken
Beiträge und Vorschläge für dieses Projekt sind willkommen. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. Ausführliche Informationen finden Sie unter https://cla.microsoft.com.
Wenn Sie einen Pull Request (PR) übermitteln, überprüft ein CLA-Bot automatisch, ob Sie eine Lizenzvereinbarung bereitstellen und den PR entsprechend ergänzen müssen (z.B. mit einer Bezeichnung oder einem Kommentar). Führen Sie einfach die Anweisungen des Bots aus. Sie müssen dies nur einmal für alle Repositorys ausführen, die unsere CLA verwenden.
Für dieses Projekt gelten die Microsoft-Verhaltensregeln für Open Source (Microsoft Open Source Code of Conduct). Weitere Informationen finden Sie in den häufig gestellten Fragen zum Verhaltenskodex. Sie können sich auch an opencode@microsoft.com wenden, wenn Sie weitere Fragen oder Anmerkungen haben.
Azure SDK for Python
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für