Azure Schema Registry Avro Encoder-klientbibliotek för Python – version 1.0.0
Azure Schema Registry är en schemalagringsplatstjänst som hanteras av Azure Event Hubs och som tillhandahåller schemalagring, versionshantering och hantering. Det här paketet innehåller en Avro-kodare som kan koda och avkoda nyttolaster som innehåller schemaregisterschemaidentifierare och Avro-kodat innehåll.
| Källkod Paket (PyPi) | API-referensdokumentation | Prover | Ändringsloggen
Friskrivning
Stöd för Azure SDK Python-paket för Python 2.7 upphörde den 1 januari 2022. Mer information och frågor finns i https://github.com/Azure/azure-sdk-for-python/issues/20691
Komma igång
Installera paketet
Installera Azure Schema Registry Avro Encoder-klientbiblioteket för Python med pip:
pip install azure-schemaregistry-avroencoder
Krav:
Om du vill använda det här paketet måste du ha:
- Azure-prenumeration – Skapa ett kostnadsfritt konto
- Azure Schema Registry - Här är snabbstartsguiden för att skapa en schemaregistergrupp med hjälp av Azure Portal.
- Python 3.6 eller senare – Installera Python
Autentisera klienten
Interaktionen med Schema Registry Avro Encoder börjar med en instans av AvroEncoder-klassen, som tar schemagruppens namn och klassen Schema Registry Client . Klientkonstruktorn tar Event Hubs fullständigt kvalificerade namnområde och Azure Active Directory-autentiseringsuppgifter:
Det fullständigt kvalificerade namnområdet för Schema Registry-instansen bör följa formatet:
<yournamespace>.servicebus.windows.net
.En AAD-autentiseringsuppgift som implementerar TokenCredential-protokollet ska skickas till konstruktorn. Det finns implementeringar av protokollet
TokenCredential
i paketet azure-identity. Om du vill använda de typer av autentiseringsuppgifter som tillhandahålls avazure-identity
installerar du Azure Identity-klientbiblioteket för Python med pip:
pip install azure-identity
- Om du vill använda asynkront API måste du dessutom först installera en asynkron transport, till exempel aiohttp:
pip install aiohttp
Skapa AvroEncoder med hjälp av azure-schemaregistry-biblioteket:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Viktiga begrepp
AvroEncoder
Tillhandahåller API för att koda till och avkoda från Avro Binary Encoding plus en innehållstyp med schema-ID. Använder SchemaRegistryClient för att hämta schema-ID:t från schemainnehåll eller tvärtom.
Meddelandemodeller som stöds
Stöd har lagts till i vissa Azure Messaging SDK-modellklasser för samverkan med AvroEncoder
. Dessa modeller är undertyper av det MessageType
protokoll som definierats under azure.schemaregistry.encoder.avroencoder
namnområdet. För närvarande är de modellklasser som stöds:
azure.eventhub.EventData
Förazure-eventhub>=5.9.0
Meddelandeformat
Om en meddelandetyp som följer MessageType-protokollet tillhandahålls till kodaren för kodning anger den motsvarande egenskaper för innehåll och innehållstyp, där:
content
: Avro-nyttolast (i allmänhet formatspecifik nyttolast)- Avro-binär kodning
- NOT Avro Object Container File, som innehåller schemat och motverkar syftet med den här kodaren att flytta schemat från meddelandenyttolasten och till schemaregistret.
content type
: en sträng i formatetavro/binary+<schema ID>
, där:avro/binary
är formatindikatorn<schema ID>
är den hexadecimala representationen av GUID, samma format och byteordning som strängen från Schema Registry-tjänsten.
Om EventData
skickas som meddelandetyp anges följande egenskaper för objektet EventData
:
- Egenskapen
body
anges till innehållsvärdet. - Egenskapen
content_type
anges till värdet för innehållstyp.
Om meddelandetypen inte anges och kodaren som standard skapar följande diktering: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Exempel
Följande avsnitt innehåller flera kodfragment som täcker några av de vanligaste schemaregisteruppgifterna, inklusive:
Kodning
AvroEncoder.encode
Använd metoden för att koda innehåll med det angivna Avro-schemat.
Metoden använder ett schema som tidigare registrerats i tjänsten Schema Registry och håller schemat cachelagrat för framtida kodningsanvändning. För att undvika förregistrering av schemat till tjänsten och automatiskt registrera det med encode
metoden ska nyckelordsargumentet auto_register=True
skickas till AvroEncoder
konstruktorn.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Avkodning
AvroEncoder.decode
Använd metoden för att avkoda det Avro-kodade innehållet genom att antingen:
- Skicka in ett meddelandeobjekt som är en undertyp av MessageType-protokollet.
- Skicka en diktamen med nycklar
content
(typ byte) ochcontent_type
(typsträng). Metoden hämtar automatiskt schemat från Schema Registry Service och håller schemat cachelagrat för framtida avkodningsanvändning.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Event Hubs- och sändningsintegrering
Integrering med Event Hubs för att skicka ett EventData
-objekt med body
inställt på Avro-kodat innehåll och motsvarande content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Event Hubs tar emot integrering
Integrering med Event Hubs för att ta emot ett EventData
-objekt och avkoda det Avro-kodade body
värdet.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Felsökning
Allmänt
Azure Schema Registry Avro Encoder genererar undantag som definierats i Azure Core om fel uppstår vid kommunikation med tjänsten Schema Registry. Fel som rör ogiltiga innehållstyper och ogiltiga scheman utlöses som azure.schemaregistry.encoder.avroencoder.InvalidContentError
respektive azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
, där __cause__
innehåller det underliggande undantaget som genereras av Apache Avro-biblioteket.
Loggning
Det här biblioteket använder standardloggningsbiblioteket för loggning. Grundläggande information om HTTP-sessioner (URL:er, rubriker osv.) loggas på INFO-nivå.
Detaljerad loggning på FELSÖKNINGsnivå, inklusive begärande-/svarskroppar och oredigerade huvuden, kan aktiveras på en klient med logging_enable
argumentet :
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
logging_enable
På samma sätt kan aktivera detaljerad loggning för en enda åtgärd, även om den inte är aktiverad för klienten:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Nästa steg
Mer exempelkod
Ytterligare exempel som visar vanliga Azure Schema Registry Avro Encoder-scenarier finns i exempelkatalogen .
Bidra
Det här projektet välkomnar bidrag och förslag. Merparten av bidragen kräver att du godkänner ett licensavtal för bidrag, där du deklarerar att du har behörighet att bevilja oss rättigheten att använda ditt bidrag, och att du dessutom uttryckligen gör så. Mer information finns på https://cla.microsoft.com.
När du skickar en pull-förfrågan avgör en CLA-robot automatiskt om du måste tillhandahålla ett licensavtal för bidrag med lämplig PR (t.ex. etikett eller kommentar). Följ bara robotens anvisningar. Du behöver bara göra detta en gång för alla repor som använder vårt licensavtal för bidrag.
Det här projektet använder sig av Microsofts uppförandekod för öppen källkod. Mer information finns i Vanliga frågor och svar om uppförandekoden eller kontakta opencode@microsoft.com med ytterligare frågor eller kommentarer.
Azure SDK for Python
Feedback
https://aka.ms/ContentUserFeedback.
Kommer snart: Under hela 2024 kommer vi att fasa ut GitHub-problem som feedbackmekanism för innehåll och ersätta det med ett nytt feedbacksystem. Mer information finns i:Skicka och visa feedback för