Sdílet prostřednictvím


Klientská knihovna kodéru Azure Schema Registry Avro pro Python – verze 1.0.0

Azure Schema Registry je služba úložiště schémat hostovaná službou Azure Event Hubs, která poskytuje úložiště schémat, správu verzí a správu. Tento balíček poskytuje kodér Avro schopný kódování a dekódování datových částí obsahujících identifikátory schématu registru schématu a obsah kódovaný Avro.

Zdrojový kód | Balíček (PyPi) | Referenční dokumentace k | rozhraní API Vzorky | Changelog

Právní omezení

Podpora balíčků Azure SDK Python pro Python 2.7 skončila 1. ledna 2022. Další informace a dotazy najdete na https://github.com/Azure/azure-sdk-for-python/issues/20691

Začínáme

Instalace balíčku

Nainstalujte klientskou knihovnu kodéru Azure Schema Registry Avro pro Python pomocí pipu:

pip install azure-schemaregistry-avroencoder

Požadavky:

Pokud chcete tento balíček použít, musíte mít:

Ověření klienta

Interakce s kodérem Avro registru schématu začíná instancí třídy AvroEncoder, která přebírá název skupiny schématu a klientskou třídu registru schématu . Konstruktor klienta převezme plně kvalifikovaný obor názvů služby Event Hubs a přihlašovací údaje Azure Active Directory:

  • Plně kvalifikovaný obor názvů instance registru schématu by měl mít formát : <yournamespace>.servicebus.windows.net.

  • Přihlašovací údaje AAD, které implementují protokol TokenCredential , by měly být předány konstruktoru. V balíčku azure-identity jsou k dispozici implementace TokenCredential protokolu. Pokud chcete používat typy přihlašovacích údajů, které azure-identityposkytuje , nainstalujte prosím klientskou knihovnu Azure Identity pro Python pomocí pipu:

pip install azure-identity
  • Pokud chcete používat asynchronní rozhraní API, musíte nejprve nainstalovat asynchronní přenos, například aiohttp:
pip install aiohttp

Vytvořte AvroEncoder pomocí knihovny azure-schemaregistry:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Klíčové koncepty

AvroEncoder

Poskytuje rozhraní API pro kódování a dekódování z binárního kódování Avro plus typ obsahu s ID schématu. Používá SchemaRegistryClient k získání ID schématu z obsahu schématu nebo naopak.

Podporované modely zpráv

K určitým třídám modelu sady Sdk pro zasílání zpráv Služby Azure byla přidána podpora pro interoperabilitu AvroEncoders . Tyto modely jsou podtypy protokolu definovaného MessageType v azure.schemaregistry.encoder.avroencoder oboru názvů. V současné době jsou podporované třídy modelu:

  • azure.eventhub.EventData Pro azure-eventhub>=5.9.0

Formát zprávy

Pokud je kodéru pro kódování poskytnut typ zprávy, který následuje po protokolu MessageType, nastaví odpovídající vlastnosti obsahu a typu obsahu, kde:

  • content: Datová část Avro (obecně datová část specifická pro formát)

    • Binární kódování Avro
    • NOT Avro Object Container File, který zahrnuje schéma a porazí účel tohoto kodéru přesunout schéma z datové části zprávy do registru schématu.
  • content type: řetězec ve formátu avro/binary+<schema ID>, kde:

    • avro/binary je indikátor formátu.
    • <schema ID> je šestnáctková reprezentace identifikátoru GUID, stejného formátu a pořadí bajtů jako řetězec ze služby Registru schématu.

Pokud EventData se předá jako typ zprávy, nastaví se u objektu EventData následující vlastnosti:

  • Vlastnost body bude nastavena na hodnotu obsahu.
  • Vlastnost content_type bude nastavena na hodnotu typu obsahu.

Pokud typ zprávy není zadán a kodér ve výchozím nastavení vytvoří následující dikt: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Příklady

V následujících částech najdete několik fragmentů kódu, které pokrývají některé nejběžnější úlohy registru schématu, mezi které patří:

Encoding

AvroEncoder.encode Pomocí metody kódujte obsah s daným schématem Avro. Metoda použije schéma, které bylo dříve zaregistrováno ve službě registru schématu, a bude ho uchovávat v mezipaměti pro budoucí použití kódování. Aby se zabránilo předběžné registraci schématu ve službě a automatické registraci v encode metodě, měl by být argument auto_register=True klíčového slova předán konstruktoru AvroEncoder .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Dekódování

AvroEncoder.decode K dekódování obsahu zakódovaného Avro použijte metodu:

  • Předání objektu zprávy, který je podtypem protokolu MessageType.
  • Předání diktátu s klíči content(zadejte bajty) a content_type (zadejte řetězec). Metoda automaticky načte schéma ze služby registru schématu a udržuje schéma v mezipaměti pro budoucí použití dekódování.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Integrace odesílání služby Event Hubs

Integrace se službou Event Hubs pro odeslání objektu s nastaveným EventData obsahem body kódovaným Avro a odpovídajícím content_typeobjektem .

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Integrace příjmu služby Event Hubs

Integrace se službou Event Hubs pro příjem objektu EventData a dekódování hodnoty kódované body Avro

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Poradce při potížích

Obecné

Kodér Azure Schema Registry Avro vyvolá výjimky definované v Azure Core , pokud při komunikaci se službou Registru schématu dojde k chybám. Chyby související s neplatnými typy obsahu a neplatnými schématy budou vyvolány jako azure.schemaregistry.encoder.avroencoder.InvalidContentError a azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, kde __cause__ budou obsahovat základní výjimku vyvolanou knihovnou Apache Avro.

protokolování

Tato knihovna používá pro protokolování standardní knihovnu protokolování . Základní informace o relacích HTTP (adresy URL, hlavičky atd.) se protokolují na úrovni INFO.

Podrobné protokolování úrovně LADĚNÍ, včetně těl požadavků/odpovědí a nereagovaných hlaviček, je možné povolit na klientovi s argumentem logging_enable :

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Podobně logging_enable může povolit podrobné protokolování pro jednu operaci, i když není povolené pro klienta:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Další kroky

Další ukázkový kód

Další příklady demonstrujících běžné scénáře kodéru Azure Schema Registry Avro jsou v adresáři samples .

Přispívání

Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com

Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.

Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo se obraťte na opencode@microsoft.com případné další dotazy nebo komentáře.