Sdílet prostřednictvím


Klientská knihovna Serializeru služby Azure Schema Registry Avro pro Python – verze 1.0.0b4

Azure Schema Registry je služba úložiště schémat hostovaná službou Azure Event Hubs, která poskytuje úložiště schémat, správu verzí a správu. Tento balíček poskytuje serializátor Avro schopný serializovat a deserializovat datové části obsahující identifikátory schématu registru schématu a data kódovaná Avro.

Zdrojový kód | Balíček (PyPi) | Referenční dokumentace k | rozhraní API Vzorky | Changelog

Právní omezení

Podpora balíčků Azure SDK Python pro Python 2.7 končí 1. ledna 2022. Další informace a dotazy najdete na https://github.com/Azure/azure-sdk-for-python/issues/20691

Začínáme

Instalace balíčku

Nainstalujte klientskou knihovnu Serializer služby Azure Schema Registry Avro a klientskou knihovnu Azure Identity pro Python pomocí pipu:

pip install azure-schemaregistry-avroserializer azure-identity

Požadavky:

Pokud chcete tento balíček použít, musíte mít:

Ověření klienta

Interakce se serializátorem registru schématu Avro začíná instancí třídy AvroSerializer, která přebírá název skupiny schématu a třídu Klienta registru schématu . Konstruktor klienta převezme plně kvalifikovaný obor názvů služby Event Hubs a přihlašovací údaje Azure Active Directory:

  • Plně kvalifikovaný obor názvů instance registru schématu by měl mít formát : <yournamespace>.servicebus.windows.net.

  • Přihlašovací údaje AAD, které implementují protokol TokenCredential , by měly být předány konstruktoru. V balíčku azure-identity jsou k dispozici implementace TokenCredential protokolu. Pokud chcete používat typy přihlašovacích údajů, které azure-identityposkytuje , nainstalujte prosím klientskou knihovnu Azure Identity pro Python pomocí pipu:

pip install azure-identity
  • Pokud chcete používat asynchronní rozhraní API podporované v Pythonu 3.6 nebo novější, musíte nejprve nainstalovat asynchronní přenos, například aiohttp:
pip install aiohttp

Vytvořte AvroSerializer pomocí knihovny azure-schemaregistry:

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Klíčové koncepty

AvroSerializer

Poskytuje rozhraní API pro serializaci a deserializaci z binárního kódování Avro plus hlavičku s ID schématu. Používá SchemaRegistryClient k získání ID schématu z obsahu schématu nebo naopak.

Formát zprávy

Stejný formát používají serializátory registru schémat napříč jazyky sady Azure SDK.

Zprávy se zakódují takto:

  • 4 bajty: Indikátor formátu

    • V současné době vždy nula značí níže uvedený formát.
  • 32 bajtů: ID schématu

    • UTF-8 hexadecimální reprezentace GUID.
    • 32 šestnáctkových číslic, žádné spojovníky.
    • Stejný formát a pořadí bajtů jako řetězec ze služby Registru schématu.
  • Zbývající bajty: Datová část Avro (obecně datová část specifická pro formát)

    • Binární kódování Avro
    • NOT Avro Object Container File, který obsahuje schéma a porazí účel tohoto serialzeru přesunout schéma z datové části zprávy do registru schématu.

Příklady

V následujících částech najdete několik fragmentů kódu, které pokrývají některé nejběžnější úlohy registru schématu, mezi které patří:

Serializace

Použijte AvroSerializer.serialize metodu k serializaci dat diktování s daným schématem avro. Metoda by používala schéma dříve zaregistrované ve službě registru schématu a uchovával schéma v mezipaměti pro budoucí použití serializace. Je také možné se vyhnout předběžné registraci schématu ve službě a automaticky se zaregistrovat pomocí serialize metody vytvořením instance AvroSerializer s argumentem auto_register_schemas=Trueklíčového slova .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Rekonstrukci

Použijte AvroSerializer.deserialize metodu k deserializaci nezpracovaných bajtů do dat diktování. Metoda automaticky načte schéma ze služby registru schématu a udržuje schéma v mezipaměti pro budoucí použití deserializace.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Integrace odesílání služby Event Hubs

Integrace se službou Event Hubs pro odesílání serializovaných dat avro dict jako těla EventData

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Integrace příjmu služby Event Hubs

Integrace se službou Event Hubs za účelem příjmu EventData a deserializace nezpracovaných bajtů do dat avro dict

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Poradce při potížích

Obecné

Serializátor služby Azure Schema Registry Avro vyvolává výjimky definované v Azure Core.

protokolování

Tato knihovna používá pro protokolování standardní knihovnu protokolování . Základní informace o relacích HTTP (adresy URL, hlavičky atd.) se protokolují na úrovni INFO.

Podrobné protokolování úrovně LADĚNÍ, včetně těl požadavků/odpovědí a nereagovaných hlaviček, je možné povolit na klientovi s argumentem logging_enable :

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

Podobně logging_enable může povolit podrobné protokolování pro jednu operaci, i když není povolené pro klienta:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Další kroky

Další ukázkový kód

Další příklady najdete v adresáři ukázek , které ukazují běžné scénáře serializátoru Avro služby Azure Schema Registry.

Přispívání

Tento projekt vítá příspěvky a návrhy. Většina příspěvků vyžaduje souhlas s licenční smlouvou s přispěvatelem (CLA), která stanoví, že máte právo udělit nám práva k používání vašeho příspěvku a skutečně tak činíte. Podrobnosti najdete tady: https://cla.microsoft.com

Při odesílání žádosti o přijetí změn robot CLA automaticky určí, jestli je potřeba poskytnout smlouvu CLA, a příslušným způsobem žádost o přijetí změn upraví (např. přidáním jmenovky nebo komentáře). Stačí postupovat podle pokynů robota. Pro všechna úložiště používající naši smlouvu CLA to stačí udělat jenom jednou.

Tento projekt přijal pravidla chování pro Microsoft Open Source. Další informace najdete v nejčastějších dotazech k pravidlům chování nebo se obraťte na opencode@microsoft.com případné další dotazy nebo komentáře.