Klientská knihovna Serializeru služby Azure Schema Registry Avro pro Python – verze 1.0.0b4
Azure Schema Registry je služba úložiště schémat hostovaná službou Azure Event Hubs, která poskytuje úložiště schémat, správu verzí a správu. Tento balíček poskytuje serializátor Avro schopný serializovat a deserializovat datové části obsahující identifikátory schématu registru schématu a data kódovaná Avro.
Zdrojový kód | Balíček (PyPi) | Referenční dokumentace k | rozhraní API Vzorky | Changelog
Právní omezení
Podpora balíčků Azure SDK Python pro Python 2.7 končí 1. ledna 2022. Další informace a dotazy najdete na https://github.com/Azure/azure-sdk-for-python/issues/20691
Začínáme
Instalace balíčku
Nainstalujte klientskou knihovnu Serializer služby Azure Schema Registry Avro a klientskou knihovnu Azure Identity pro Python pomocí pipu:
pip install azure-schemaregistry-avroserializer azure-identity
Požadavky:
Pokud chcete tento balíček použít, musíte mít:
- Předplatné Azure – Vytvoření bezplatného účtu
- Azure Schema Registry
- Python 2.7, 3.6 nebo novější – instalace Pythonu
Ověření klienta
Interakce se serializátorem registru schématu Avro začíná instancí třídy AvroSerializer, která přebírá název skupiny schématu a třídu Klienta registru schématu . Konstruktor klienta převezme plně kvalifikovaný obor názvů služby Event Hubs a přihlašovací údaje Azure Active Directory:
Plně kvalifikovaný obor názvů instance registru schématu by měl mít formát :
<yournamespace>.servicebus.windows.net
.Přihlašovací údaje AAD, které implementují protokol TokenCredential , by měly být předány konstruktoru. V balíčku azure-identity jsou k dispozici implementace
TokenCredential
protokolu. Pokud chcete používat typy přihlašovacích údajů, kteréazure-identity
poskytuje , nainstalujte prosím klientskou knihovnu Azure Identity pro Python pomocí pipu:
pip install azure-identity
- Pokud chcete používat asynchronní rozhraní API podporované v Pythonu 3.6 nebo novější, musíte nejprve nainstalovat asynchronní přenos, například aiohttp:
pip install aiohttp
Vytvořte AvroSerializer pomocí knihovny azure-schemaregistry:
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
Klíčové koncepty
AvroSerializer
Poskytuje rozhraní API pro serializaci a deserializaci z binárního kódování Avro plus hlavičku s ID schématu. Používá SchemaRegistryClient k získání ID schématu z obsahu schématu nebo naopak.
Formát zprávy
Stejný formát používají serializátory registru schémat napříč jazyky sady Azure SDK.
Zprávy se zakódují takto:
4 bajty: Indikátor formátu
- V současné době vždy nula značí níže uvedený formát.
32 bajtů: ID schématu
- UTF-8 hexadecimální reprezentace GUID.
- 32 šestnáctkových číslic, žádné spojovníky.
- Stejný formát a pořadí bajtů jako řetězec ze služby Registru schématu.
Zbývající bajty: Datová část Avro (obecně datová část specifická pro formát)
- Binární kódování Avro
- NOT Avro Object Container File, který obsahuje schéma a porazí účel tohoto serialzeru přesunout schéma z datové části zprávy do registru schématu.
Příklady
V následujících částech najdete několik fragmentů kódu, které pokrývají některé nejběžnější úlohy registru schématu, mezi které patří:
Serializace
Použijte AvroSerializer.serialize
metodu k serializaci dat diktování s daným schématem avro.
Metoda by používala schéma dříve zaregistrované ve službě registru schématu a uchovával schéma v mezipaměti pro budoucí použití serializace. Je také možné se vyhnout předběžné registraci schématu ve službě a automaticky se zaregistrovat pomocí serialize
metody vytvořením instance AvroSerializer
s argumentem auto_register_schemas=True
klíčového slova .
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema=definition)
Rekonstrukci
Použijte AvroSerializer.deserialize
metodu k deserializaci nezpracovaných bajtů do dat diktování.
Metoda automaticky načte schéma ze služby registru schématu a udržuje schéma v mezipaměti pro budoucí použití deserializace.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
Integrace odesílání služby Event Hubs
Integrace se službou Event Hubs pro odesílání serializovaných dat avro dict jako těla EventData
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
Integrace příjmu služby Event Hubs
Integrace se službou Event Hubs za účelem příjmu EventData
a deserializace nezpracovaných bajtů do dat avro dict
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Poradce při potížích
Obecné
Serializátor služby Azure Schema Registry Avro vyvolává výjimky definované v Azure Core.
protokolování
Tato knihovna používá pro protokolování standardní knihovnu protokolování . Základní informace o relacích HTTP (adresy URL, hlavičky atd.) se protokolují na úrovni INFO.
Podrobné protokolování úrovně LADĚNÍ, včetně těl požadavků/odpovědí a nereagovaných hlaviček, je možné povolit na klientovi s argumentem logging_enable
:
import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")
Podobně logging_enable
může povolit podrobné protokolování pro jednu operaci, i když není povolené pro klienta:
serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)
Další kroky
Další ukázkový kód
Další příklady najdete v adresáři ukázek , které ukazují běžné scénáře serializátoru Avro služby Azure Schema Registry.
Přispívání
Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com
Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.
Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo se obraťte na opencode@microsoft.com případné další dotazy nebo komentáře.
Azure SDK for Python