Share via


Azure Schema Registry Avro Encoder-klientbibliotek för Python – version 1.0.0

Azure Schema Registry är en schemalagringsplatstjänst som hanteras av Azure Event Hubs och som tillhandahåller schemalagring, versionshantering och hantering. Det här paketet innehåller en Avro-kodare som kan koda och avkoda nyttolaster som innehåller schemaregisterschemaidentifierare och Avro-kodat innehåll.

| Källkod Paket (PyPi) | API-referensdokumentation | Prover | Ändringsloggen

Friskrivning

Stöd för Azure SDK Python-paket för Python 2.7 upphörde den 1 januari 2022. Mer information och frågor finns i https://github.com/Azure/azure-sdk-for-python/issues/20691

Komma igång

Installera paketet

Installera Azure Schema Registry Avro Encoder-klientbiblioteket för Python med pip:

pip install azure-schemaregistry-avroencoder

Krav:

Om du vill använda det här paketet måste du ha:

Autentisera klienten

Interaktionen med Schema Registry Avro Encoder börjar med en instans av AvroEncoder-klassen, som tar schemagruppens namn och klassen Schema Registry Client . Klientkonstruktorn tar Event Hubs fullständigt kvalificerade namnområde och Azure Active Directory-autentiseringsuppgifter:

  • Det fullständigt kvalificerade namnområdet för Schema Registry-instansen bör följa formatet: <yournamespace>.servicebus.windows.net.

  • En AAD-autentiseringsuppgift som implementerar TokenCredential-protokollet ska skickas till konstruktorn. Det finns implementeringar av protokollet TokenCredential i paketet azure-identity. Om du vill använda de typer av autentiseringsuppgifter som tillhandahålls av azure-identityinstallerar du Azure Identity-klientbiblioteket för Python med pip:

pip install azure-identity
  • Om du vill använda asynkront API måste du dessutom först installera en asynkron transport, till exempel aiohttp:
pip install aiohttp

Skapa AvroEncoder med hjälp av azure-schemaregistry-biblioteket:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Viktiga begrepp

AvroEncoder

Tillhandahåller API för att koda till och avkoda från Avro Binary Encoding plus en innehållstyp med schema-ID. Använder SchemaRegistryClient för att hämta schema-ID:t från schemainnehåll eller tvärtom.

Meddelandemodeller som stöds

Stöd har lagts till i vissa Azure Messaging SDK-modellklasser för samverkan med AvroEncoder. Dessa modeller är undertyper av det MessageType protokoll som definierats under azure.schemaregistry.encoder.avroencoder namnområdet. För närvarande är de modellklasser som stöds:

  • azure.eventhub.EventData För azure-eventhub>=5.9.0

Meddelandeformat

Om en meddelandetyp som följer MessageType-protokollet tillhandahålls till kodaren för kodning anger den motsvarande egenskaper för innehåll och innehållstyp, där:

  • content: Avro-nyttolast (i allmänhet formatspecifik nyttolast)

    • Avro-binär kodning
    • NOT Avro Object Container File, som innehåller schemat och motverkar syftet med den här kodaren att flytta schemat från meddelandenyttolasten och till schemaregistret.
  • content type: en sträng i formatet avro/binary+<schema ID>, där:

    • avro/binary är formatindikatorn
    • <schema ID> är den hexadecimala representationen av GUID, samma format och byteordning som strängen från Schema Registry-tjänsten.

Om EventData skickas som meddelandetyp anges följande egenskaper för objektet EventData :

  • Egenskapen body anges till innehållsvärdet.
  • Egenskapen content_type anges till värdet för innehållstyp.

Om meddelandetypen inte anges och kodaren som standard skapar följande diktering: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Exempel

Följande avsnitt innehåller flera kodfragment som täcker några av de vanligaste schemaregisteruppgifterna, inklusive:

Kodning

AvroEncoder.encode Använd metoden för att koda innehåll med det angivna Avro-schemat. Metoden använder ett schema som tidigare registrerats i tjänsten Schema Registry och håller schemat cachelagrat för framtida kodningsanvändning. För att undvika förregistrering av schemat till tjänsten och automatiskt registrera det med encode metoden ska nyckelordsargumentet auto_register=True skickas till AvroEncoder konstruktorn.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Avkodning

AvroEncoder.decode Använd metoden för att avkoda det Avro-kodade innehållet genom att antingen:

  • Skicka in ett meddelandeobjekt som är en undertyp av MessageType-protokollet.
  • Skicka en diktamen med nycklar content(typ byte) och content_type (typsträng). Metoden hämtar automatiskt schemat från Schema Registry Service och håller schemat cachelagrat för framtida avkodningsanvändning.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Event Hubs- och sändningsintegrering

Integrering med Event Hubs för att skicka ett EventData -objekt med body inställt på Avro-kodat innehåll och motsvarande content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Event Hubs tar emot integrering

Integrering med Event Hubs för att ta emot ett EventData -objekt och avkoda det Avro-kodade body värdet.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Felsökning

Allmänt

Azure Schema Registry Avro Encoder genererar undantag som definierats i Azure Core om fel uppstår vid kommunikation med tjänsten Schema Registry. Fel som rör ogiltiga innehållstyper och ogiltiga scheman utlöses som azure.schemaregistry.encoder.avroencoder.InvalidContentError respektive azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, där __cause__ innehåller det underliggande undantaget som genereras av Apache Avro-biblioteket.

Loggning

Det här biblioteket använder standardloggningsbiblioteket för loggning. Grundläggande information om HTTP-sessioner (URL:er, rubriker osv.) loggas på INFO-nivå.

Detaljerad loggning på FELSÖKNINGsnivå, inklusive begärande-/svarskroppar och oredigerade huvuden, kan aktiveras på en klient med logging_enable argumentet :

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

logging_enable På samma sätt kan aktivera detaljerad loggning för en enda åtgärd, även om den inte är aktiverad för klienten:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Nästa steg

Mer exempelkod

Ytterligare exempel som visar vanliga Azure Schema Registry Avro Encoder-scenarier finns i exempelkatalogen .

Bidra

Det här projektet välkomnar bidrag och förslag. Merparten av bidragen kräver att du godkänner ett licensavtal för bidrag, där du deklarerar att du har behörighet att bevilja oss rättigheten att använda ditt bidrag, och att du dessutom uttryckligen gör så. Mer information finns på https://cla.microsoft.com.

När du skickar en pull-förfrågan avgör en CLA-robot automatiskt om du måste tillhandahålla ett licensavtal för bidrag med lämplig PR (t.ex. etikett eller kommentar). Följ bara robotens anvisningar. Du behöver bara göra detta en gång för alla repor som använder vårt licensavtal för bidrag.

Det här projektet använder sig av Microsofts uppförandekod för öppen källkod. Mer information finns i Vanliga frågor och svar om uppförandekoden eller kontakta opencode@microsoft.com med ytterligare frågor eller kommentarer.