Klientská knihovna kodéru Azure Schema Registry Avro pro Python – verze 1.0.0
Azure Schema Registry je služba úložiště schémat hostovaná službou Azure Event Hubs, která poskytuje úložiště schémat, správu verzí a správu. Tento balíček poskytuje kodér Avro schopný kódování a dekódování datových částí obsahujících identifikátory schématu registru schématu a obsah kódovaný Avro.
Zdrojový kód | Balíček (PyPi) | Referenční dokumentace k | rozhraní API Vzorky | Changelog
Právní omezení
Podpora balíčků Azure SDK Python pro Python 2.7 skončila 1. ledna 2022. Další informace a dotazy najdete na https://github.com/Azure/azure-sdk-for-python/issues/20691
Začínáme
Instalace balíčku
Nainstalujte klientskou knihovnu kodéru Azure Schema Registry Avro pro Python pomocí pipu:
pip install azure-schemaregistry-avroencoder
Požadavky:
Pokud chcete tento balíček použít, musíte mít:
- Předplatné Azure – Vytvoření bezplatného účtu
- Azure Schema Registry - Tady je průvodce rychlým startem pro vytvoření skupiny registru schématu pomocí Azure Portal.
- Python 3.6 nebo novější – instalace Pythonu
Ověření klienta
Interakce s kodérem Avro registru schématu začíná instancí třídy AvroEncoder, která přebírá název skupiny schématu a klientskou třídu registru schématu . Konstruktor klienta převezme plně kvalifikovaný obor názvů služby Event Hubs a přihlašovací údaje Azure Active Directory:
Plně kvalifikovaný obor názvů instance registru schématu by měl mít formát :
<yournamespace>.servicebus.windows.net
.Přihlašovací údaje AAD, které implementují protokol TokenCredential , by měly být předány konstruktoru. V balíčku azure-identity jsou k dispozici implementace
TokenCredential
protokolu. Pokud chcete používat typy přihlašovacích údajů, kteréazure-identity
poskytuje , nainstalujte prosím klientskou knihovnu Azure Identity pro Python pomocí pipu:
pip install azure-identity
- Pokud chcete používat asynchronní rozhraní API, musíte nejprve nainstalovat asynchronní přenos, například aiohttp:
pip install aiohttp
Vytvořte AvroEncoder pomocí knihovny azure-schemaregistry:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Klíčové koncepty
AvroEncoder
Poskytuje rozhraní API pro kódování a dekódování z binárního kódování Avro plus typ obsahu s ID schématu. Používá SchemaRegistryClient k získání ID schématu z obsahu schématu nebo naopak.
Podporované modely zpráv
K určitým třídám modelu sady Sdk pro zasílání zpráv Služby Azure byla přidána podpora pro interoperabilitu AvroEncoder
s . Tyto modely jsou podtypy protokolu definovaného MessageType
v azure.schemaregistry.encoder.avroencoder
oboru názvů. V současné době jsou podporované třídy modelu:
azure.eventhub.EventData
Proazure-eventhub>=5.9.0
Formát zprávy
Pokud je kodéru pro kódování poskytnut typ zprávy, který následuje po protokolu MessageType, nastaví odpovídající vlastnosti obsahu a typu obsahu, kde:
content
: Datová část Avro (obecně datová část specifická pro formát)- Binární kódování Avro
- NOT Avro Object Container File, který zahrnuje schéma a porazí účel tohoto kodéru přesunout schéma z datové části zprávy do registru schématu.
content type
: řetězec ve formátuavro/binary+<schema ID>
, kde:avro/binary
je indikátor formátu.<schema ID>
je šestnáctková reprezentace identifikátoru GUID, stejného formátu a pořadí bajtů jako řetězec ze služby Registru schématu.
Pokud EventData
se předá jako typ zprávy, nastaví se u objektu EventData
následující vlastnosti:
- Vlastnost
body
bude nastavena na hodnotu obsahu. - Vlastnost
content_type
bude nastavena na hodnotu typu obsahu.
Pokud typ zprávy není zadán a kodér ve výchozím nastavení vytvoří následující dikt: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Příklady
V následujících částech najdete několik fragmentů kódu, které pokrývají některé nejběžnější úlohy registru schématu, mezi které patří:
Encoding
AvroEncoder.encode
Pomocí metody kódujte obsah s daným schématem Avro.
Metoda použije schéma, které bylo dříve zaregistrováno ve službě registru schématu, a bude ho uchovávat v mezipaměti pro budoucí použití kódování. Aby se zabránilo předběžné registraci schématu ve službě a automatické registraci v encode
metodě, měl by být argument auto_register=True
klíčového slova předán konstruktoru AvroEncoder
.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Dekódování
AvroEncoder.decode
K dekódování obsahu zakódovaného Avro použijte metodu:
- Předání objektu zprávy, který je podtypem protokolu MessageType.
- Předání diktátu s klíči
content
(zadejte bajty) acontent_type
(zadejte řetězec). Metoda automaticky načte schéma ze služby registru schématu a udržuje schéma v mezipaměti pro budoucí použití dekódování.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Integrace odesílání služby Event Hubs
Integrace se službou Event Hubs pro odeslání objektu s nastaveným EventData
obsahem body
kódovaným Avro a odpovídajícím content_type
objektem .
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Integrace příjmu služby Event Hubs
Integrace se službou Event Hubs pro příjem objektu EventData
a dekódování hodnoty kódované body
Avro
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Poradce při potížích
Obecné
Kodér Azure Schema Registry Avro vyvolá výjimky definované v Azure Core , pokud při komunikaci se službou Registru schématu dojde k chybám. Chyby související s neplatnými typy obsahu a neplatnými schématy budou vyvolány jako azure.schemaregistry.encoder.avroencoder.InvalidContentError
a azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
, kde __cause__
budou obsahovat základní výjimku vyvolanou knihovnou Apache Avro.
protokolování
Tato knihovna používá pro protokolování standardní knihovnu protokolování . Základní informace o relacích HTTP (adresy URL, hlavičky atd.) se protokolují na úrovni INFO.
Podrobné protokolování úrovně LADĚNÍ, včetně těl požadavků/odpovědí a nereagovaných hlaviček, je možné povolit na klientovi s argumentem logging_enable
:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Podobně logging_enable
může povolit podrobné protokolování pro jednu operaci, i když není povolené pro klienta:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Další kroky
Další ukázkový kód
Další příklady demonstrujících běžné scénáře kodéru Azure Schema Registry Avro jsou v adresáři samples .
Přispívání
Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com
Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.
Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo se obraťte na opencode@microsoft.com případné další dotazy nebo komentáře.
Azure SDK for Python