Share via


Azure Schema Registry Avro Encoder-Clientbibliothek für Python – Version 1.0.0

Azure Schema Registry ist ein von Azure Event Hubs gehosteter Schemarepositorydienst, der Schemaspeicher, Versionsverwaltung und -verwaltung bereitstellt. Dieses Paket stellt einen Avro-Encoder bereit, der Nutzdaten codieren und decodieren kann, die Schemaregistrierungsschemabezeichner und Avro-codierte Inhalte enthalten.

Quellcode | Paket (PyPi) | API-Referenzdokumentation | Proben | Changelog

Haftungsausschluss

Die Unterstützung von Python-Paketen für Das Azure SDK für Python 2.7 wurde am 01. Januar 2022 eingestellt. Weitere Informationen und Antworten finden Sie unter https://github.com/Azure/azure-sdk-for-python/issues/20691.

Erste Schritte

Installieren des Pakets

Installieren Sie die Azure Schema Registry Avro Encoder-Clientbibliothek für Python mit pip:

pip install azure-schemaregistry-avroencoder

Voraussetzungen:

Um dieses Paket verwenden zu können, benötigen Sie Folgendes:

Authentifizieren des Clients

Die Interaktion mit dem Avro Encoder der Schemaregistrierung beginnt mit einer Instanz der AvroEncoder-Klasse, die den Schemagruppennamen und die Schemaregistrierungsclientklasse übernimmt. Der Clientkonstruktor übernimmt den vollqualifizierten Event Hubs-Namespace und Azure Active Directory-Anmeldeinformationen:

  • Der vollqualifizierte Namespace der Schemaregistrierungsinstanz sollte das folgende Format aufweisen: <yournamespace>.servicebus.windows.net.

  • AAD-Anmeldeinformationen, die das TokenCredential-Protokoll implementieren, sollten an den Konstruktor übergeben werden. Im Paket azure-identity sind Implementierungen des TokenCredential Protokolls verfügbar. Um die von azure-identitybereitgestellten Anmeldeinformationstypen zu verwenden, installieren Sie die Azure Identity-Clientbibliothek für Python mit pip:

pip install azure-identity
  • Außerdem müssen Sie für die Verwendung der asynchronen API zunächst einen asynchronen Transport installieren, z. B. aiohttp:
pip install aiohttp

Erstellen Sie AvroEncoder mithilfe der Bibliothek azure-schemaregistry:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Wichtige Begriffe

AvroEncoder

Stellt eine API zum Codieren und Decodieren von Avro Binary Encoding sowie einen Inhaltstyp mit Schema-ID bereit. Verwendet SchemaRegistryClient , um Schema-IDs aus Schemainhalten abzurufen oder umgekehrt.

Unterstützte Nachrichtenmodelle

Unterstützung für bestimmte Azure Messaging SDK-Modellklassen wurde für die AvroEncoderInteroperabilität mit hinzugefügt. Bei diesen Modellen handelt es sich um Untertypen des MessageType unter dem azure.schemaregistry.encoder.avroencoder Namespace definierten Protokolls. Derzeit werden folgende Modellklassen unterstützt:

  • azure.eventhub.EventData für azure-eventhub>=5.9.0

Nachrichtenformat

Wenn ein Nachrichtentyp, der dem MessageType-Protokoll folgt, dem Encoder für die Codierung bereitgestellt wird, werden die entsprechenden Inhalts- und Inhaltstypeigenschaften festgelegt, wobei Folgendes gilt:

  • content: Avro-Nutzlast (im Allgemeinen formatspezifische Nutzlast)

    • Avro Binärcodierung
    • NICHT Avro-Objektcontainerdatei, die das Schema enthält und den Zweck dieses Encoders verfehlt, das Schema aus der Nachrichtennutzlast in die Schemaregistrierung zu verschieben.
  • content type: eine Zeichenfolge im Format avro/binary+<schema ID>, wobei:

    • avro/binary ist der Formatindikator
    • <schema ID> ist die hexadezimale Darstellung der GUID, gleiches Format und die gleiche Bytereihenfolge wie die Zeichenfolge aus dem Schemaregistrierungsdienst.

Wenn EventData als Nachrichtentyp übergeben wird, werden die folgenden Eigenschaften für das EventData -Objekt festgelegt:

  • Die body -Eigenschaft wird auf den Inhaltswert festgelegt.
  • Die content_type -Eigenschaft wird auf den Inhaltstypwert festgelegt.

Wenn der Nachrichtentyp nicht angegeben ist, erstellt der Encoder standardmäßig das folgende Dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Beispiele

Die folgenden Abschnitte enthalten mehrere Codeausschnitte, die einige der am häufigsten verwendeten Schemaregistrierungsaufgaben behandeln, einschließlich:

Codierung

Verwenden Sie die AvroEncoder.encode -Methode, um Inhalte mit dem angegebenen Avro-Schema zu codieren. Die -Methode verwendet ein Schema, das zuvor beim Schemaregistrierungsdienst registriert wurde, und behält das Schema für die zukünftige Codierungsverwendung zwischengespeichert. Um eine Vorregistrierung des Schemas beim Dienst zu vermeiden und es automatisch bei der encode -Methode zu registrieren, sollte das Schlüsselwortargument auto_register=True an den AvroEncoder Konstruktor übergeben werden.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Decodierung

Verwenden Sie die AvroEncoder.decode -Methode, um den Avro-codierten Inhalt durch einen der folgenden Methoden zu decodieren:

  • Übergeben eines Nachrichtenobjekts, das ein Untertyp des MessageType-Protokolls ist.
  • Übergeben eines Dikts mit Schlüsseln content(Typ bytes) und content_type (Typzeichenfolge). Die -Methode ruft das Schema automatisch aus dem Schemaregistrierungsdienst ab und speichert das Schema für die zukünftige Decodierungsverwendung zwischen.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Event Hubs-Sendeintegration

Integration mit Event Hubs zum Senden eines EventData Objekts mit body festgelegtem Avro-codierten Inhalt und entsprechenden content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Event Hubs, die Integration empfangen

Integration mit Event Hubs , um ein EventData Objekt zu empfangen und den Avro-codierten Wert zu decodieren body .

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Problembehandlung

Allgemein

Azure Schema Registry Avro Encoder löst in Azure Core definierte Ausnahmen aus, wenn bei der Kommunikation mit dem Schemaregistrierungsdienst Fehler auftreten. Fehler im Zusammenhang mit ungültigen Inhaltstypen und ungültigen Schemas werden als azure.schemaregistry.encoder.avroencoder.InvalidContentError bzw azure.schemaregistry.encoder.avroencoder.InvalidSchemaError. ausgelöst, wobei __cause__ die zugrunde liegende Ausnahme enthalten wird, die von der Apache Avro-Bibliothek ausgelöst wird.

Protokollierung

Diese Bibliothek verwendet die Standardprotokollierungsbibliothek für die Protokollierung. Grundlegende Informationen zu HTTP-Sitzungen (URLs, Header usw.) werden auf INFO-Ebene protokolliert.

Eine detaillierte Protokollierung auf DEBUG-Ebene, einschließlich Anforderungs-/Antworttexten und nicht ausgeführten Headern, kann auf einem Client mit dem logging_enable Argument aktiviert werden:

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Ebenso kann über logging_enable die ausführliche Protokollierung für einen einzelnen Vorgang aktiviert werden, auch wenn diese Funktion für den Client nicht aktiviert ist:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Nächste Schritte

Weiterer Beispielcode

Weitere Beispiele, die gängige Szenarien für Azure Schema Registry Avro Encoder veranschaulichen, finden Sie im Verzeichnis beispiele .

Mitwirken

Beiträge und Vorschläge für dieses Projekt sind willkommen. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. Ausführliche Informationen finden Sie unter https://cla.microsoft.com.

Wenn Sie einen Pull Request (PR) übermitteln, überprüft ein CLA-Bot automatisch, ob Sie eine Lizenzvereinbarung bereitstellen und den PR entsprechend ergänzen müssen (z.B. mit einer Bezeichnung oder einem Kommentar). Führen Sie einfach die Anweisungen des Bots aus. Sie müssen dies nur einmal für alle Repositorys ausführen, die unsere CLA verwenden.

Für dieses Projekt gelten die Microsoft-Verhaltensregeln für Open Source (Microsoft Open Source Code of Conduct). Weitere Informationen finden Sie in den häufig gestellten Fragen zum Verhaltenskodex. Sie können sich auch an opencode@microsoft.com wenden, wenn Sie weitere Fragen oder Anmerkungen haben.