Share via


Azure Schema Registry Avro Serializer-klientbibliotek för Python – version 1.0.0b4

Azure Schema Registry är en schemalagringsplatstjänst som hanteras av Azure Event Hubs, som tillhandahåller schemalagring, versionshantering och hantering. Det här paketet tillhandahåller en Avro-serialiserare som kan serialisera och deserialisera nyttolaster som innehåller schemaregisterschemaidentifierare och Avro-kodade data.

Källkod | Paket (PyPi) | API-referensdokumentation | Prover | Ändringsloggen

Friskrivning

Stöd för Azure SDK Python-paket för Python 2.7 upphör den 1 januari 2022. Mer information och frågor finns i https://github.com/Azure/azure-sdk-for-python/issues/20691

Komma igång

Installera paketet

Installera Azure Schema Registry Avro Serializer-klientbiblioteket och Azure Identity-klientbiblioteket för Python med pip:

pip install azure-schemaregistry-avroserializer azure-identity

Krav:

Om du vill använda det här paketet måste du ha:

Autentisera klienten

Interaktionen med Schema Registry Avro Serializer börjar med en instans av klassen AvroSerializer, som tar schemagruppens namn och klassen Schema Registry Client . Klientkonstruktorn tar det fullständigt kvalificerade namnområdet för Event Hubs och Azure Active Directory-autentiseringsuppgifterna:

  • Det fullständigt kvalificerade namnområdet för Schema Registry-instansen bör följa formatet: <yournamespace>.servicebus.windows.net.

  • En AAD-autentiseringsuppgift som implementerar TokenCredential-protokollet ska skickas till konstruktorn. Det finns implementeringar av protokollet TokenCredential i azure-identity-paketet. Om du vill använda de typer av autentiseringsuppgifter som tillhandahålls av azure-identityinstallerar du Azure Identity-klientbiblioteket för Python med pip:

pip install azure-identity
  • Om du vill använda det asynkrona API som stöds i Python 3.6+, måste du först installera en asynkron transport, till exempel aiohttp:
pip install aiohttp

Skapa AvroSerializer med azure-schemaregistry-biblioteket:

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Viktiga begrepp

AvroSerializer

Tillhandahåller API för att serialisera till och deserialisera från Avro Binary Encoding plus en rubrik med schema-ID. Använder SchemaRegistryClient för att hämta schema-ID:t från schemainnehåll eller tvärtom.

Meddelandeformat

Samma format används av schemaregisterserialiserare på Olika Azure SDK-språk.

Meddelanden kodas på följande sätt:

  • 4 byte: Formatindikator

    • För närvarande alltid noll för att ange format nedan.
  • 32 byte: Schema-ID

    • UTF-8 hexadecimal representation av GUID.
    • 32 hexsiffror, inga bindestreck.
    • Samma format och byteordning som sträng från Schema Registry-tjänsten.
  • Återstående byte: Avro-nyttolast (i allmänhet formatspecifik nyttolast)

    • Avro Binary Encoding
    • NOT Avro Object Container File, som innehåller schemat och besegrar syftet med den här serialzern för att flytta schemat från meddelandenyttolasten och till schemaregistret.

Exempel

Följande avsnitt innehåller flera kodfragment som täcker några av de vanligaste schemaregisteruppgifterna, inklusive:

Serialisering

Använd AvroSerializer.serialize metoden för att serialisera dikteringsdata med det angivna avro-schemat. Metoden skulle använda ett schema som tidigare registrerats i Schema Registry-tjänsten och behålla schemat cachelagrat för framtida serialiseringsanvändning. Du kan också undvika att förregistrera schemat till tjänsten och automatiskt registrera med serialize metoden genom att instansiera AvroSerializer med nyckelordsargumentet auto_register_schemas=True.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Deserialisering

Använd AvroSerializer.deserialize metoden för att deserialisera råbyte till dikteringsdata. Metoden hämtar automatiskt schemat från Schema Registry Service och behåller schemat cachelagrat för framtida deserialiseringsanvändning.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Event Hubs Skickar integrering

Integrering med Event Hubs för att skicka serialiserade avro-dikteringsdata som brödtext i EventData.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Händelsehubbar som tar emot integrering

Integrering med Event Hubs för att ta emot EventData och deserialiserade råbyte i avro-dikteringsdata.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Felsökning

Allmänt

Azure Schema Registry Avro Serializer genererar undantag som definierats i Azure Core.

Loggning

Det här biblioteket använder standardloggningsbiblioteket för loggning. Grundläggande information om HTTP-sessioner (URL:er, rubriker osv.) loggas på INFO-nivå.

Detaljerad loggning på FELSÖKNINGsnivå, inklusive begärande-/svarskroppar och oredigerade huvuden, kan aktiveras på en klient med logging_enable argumentet :

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

logging_enable På samma sätt kan du aktivera detaljerad loggning för en enda åtgärd, även om den inte är aktiverad för klienten:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Nästa steg

Mer exempelkod

Mer information finns i exempelkatalogen som visar vanliga Azure Schema Registry Avro Serializer-scenarier.

Bidra

Det här projektet välkomnar bidrag och förslag. Merparten av bidragen kräver att du godkänner ett licensavtal för bidrag, där du deklarerar att du har behörighet att bevilja oss rättigheten att använda ditt bidrag, och att du dessutom uttryckligen gör så. Mer information finns på https://cla.microsoft.com.

När du skickar en pull-förfrågan avgör en CLA-robot automatiskt om du måste tillhandahålla ett licensavtal för bidrag med lämplig PR (t.ex. etikett eller kommentar). Följ bara robotens anvisningar. Du behöver bara göra detta en gång för alla repor som använder vårt licensavtal för bidrag.

Det här projektet använder sig av Microsofts uppförandekod för öppen källkod. Mer information finns i Vanliga frågor och svar om uppförandekod eller kontakt opencode@microsoft.com med ytterligare frågor eller kommentarer.