Azure Schema Registry Avro Encoder-clientbibliotheek voor Python - versie 1.0.0
Azure Schema Registry is een service voor schemaopslagplaats die wordt gehost door Azure Event Hubs en die schema-opslag, versiebeheer en beheer biedt. Dit pakket biedt een Avro-encoder die payloads met schemaregisterschema-id's en avro-gecodeerde inhoud kan coderen en decoderen.
Broncode | Pakket (PyPi) | API-referentiedocumentatie | Monsters | Changelog
Disclaimer
Ondersteuning voor Azure SDK Python-pakketten voor Python 2.7 is beëindigd op 1 januari 2022. Voor meer informatie en vragen raadpleegt u https://github.com/Azure/azure-sdk-for-python/issues/20691
Aan de slag
Het pakket installeren
Installeer de Azure Schema Registry Avro Encoder-clientbibliotheek voor Python met pip:
pip install azure-schemaregistry-avroencoder
Vereisten:
Als u dit pakket wilt gebruiken, hebt u het volgende nodig:
- Azure-abonnement: maak een gratis account
- Azure-schemaregister - Hier volgt de snelstartgids voor het maken van een schemaregistergroep met behulp van de Azure Portal.
- Python 3.6 of hoger - Python installeren
De client verifiëren
Interactie met de Avro-encoder van het schemaregister begint met een exemplaar van de klasse AvroEncoder, waarbij de naam van de schemagroep en de clientklasse schemaregister worden gebruikt. De clientconstructor neemt de Volledig gekwalificeerde Event Hubs-naamruimte en azure Active Directory-referenties op:
De volledig gekwalificeerde naamruimte van het schemaregisterexemplaren moet de volgende indeling hebben:
<yournamespace>.servicebus.windows.net
.Een AAD-referentie waarmee het TokenCredential-protocol wordt geïmplementeerd, moet worden doorgegeven aan de constructor. Er zijn implementaties van het
TokenCredential
protocol beschikbaar in het azure-identity-pakket. Als u de referentietypen wilt gebruiken die worden geleverd doorazure-identity
, installeert u de Azure Identity-clientbibliotheek voor Python met pip:
pip install azure-identity
- Als u de asynchrone API wilt gebruiken, moet u bovendien eerst een asynchroon transport installeren, zoals aiohttp:
pip install aiohttp
AvroEncoder maken met behulp van de bibliotheek azure-schemaregistry:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Belangrijkste concepten
AvroEncoder
Biedt API voor het coderen naar en decoderen van Binaire Avro-codering plus een inhoudstype met schema-id. Maakt gebruik van SchemaRegistryClient om schema-id's op te halen uit schema-inhoud of omgekeerd.
Ondersteunde berichtmodellen
Er is ondersteuning toegevoegd aan bepaalde Azure Messaging SDK-modelklassen voor interoperabiliteit met de AvroEncoder
. Deze modellen zijn subtypen van het MessageType
protocol dat is gedefinieerd onder de azure.schemaregistry.encoder.avroencoder
naamruimte. Momenteel zijn de ondersteunde modelklassen:
azure.eventhub.EventData
Voorazure-eventhub>=5.9.0
Berichtindeling
Als een berichttype dat volgt op het MessageType-protocol wordt geleverd aan de encoder voor codering, worden de bijbehorende eigenschappen van inhoud en inhoudstype ingesteld, waarbij:
content
: Avro-nettolading (in het algemeen, indelingsspecifieke nettolading)- Binaire Avro-codering
- NOT Avro-objectcontainerbestand, dat het schema bevat en het doel van deze encoder teniet doet om het schema uit de nettolading van het bericht naar het schemaregister te verplaatsen.
content type
: een tekenreeks met de indelingavro/binary+<schema ID>
, waarbij:avro/binary
is de indelingsindicator<schema ID>
is de hexadecimale weergave van GUID, dezelfde indeling en bytevolgorde als de tekenreeks van de Schema Registry-service.
Als EventData
wordt doorgegeven als het berichttype, worden de volgende eigenschappen ingesteld voor het EventData
-object:
- De
body
eigenschap wordt ingesteld op de inhoudswaarde. - De
content_type
eigenschap wordt ingesteld op de waarde van het inhoudstype.
Als het berichttype niet is opgegeven en standaard, maakt de encoder het volgende dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Voorbeelden
De volgende secties bevatten verschillende codefragmenten voor enkele van de meest voorkomende schemaregistertaken, waaronder:
Encoding
Gebruik de AvroEncoder.encode
methode om inhoud te coderen met het opgegeven Avro-schema.
De methode gebruikt een schema dat eerder is geregistreerd bij de Schema Registry-service en houdt het schema in de cache voor toekomstig coderingsgebruik. Om te voorkomen dat het schema vooraf wordt geregistreerd bij de service en het automatisch wordt geregistreerd met de encode
methode, moet het trefwoordargument auto_register=True
worden doorgegeven aan de AvroEncoder
constructor.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Decodering
Gebruik de AvroEncoder.decode
methode om de met Avro gecodeerde inhoud te decoderen door:
- Doorgeven van een berichtobject dat een subtype van het MessageType-protocol is.
- Een dict doorgeven met sleutels
content
(type bytes) encontent_type
(type tekenreeks). Met de methode wordt het schema automatisch opgehaald uit de schemaregisterservice en wordt het schema in de cache opgeslagen voor toekomstig decoderingsgebruik.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Integratie van verzenden van Event Hubs
Integratie met Event Hubs voor het verzenden van een EventData
object met body
ingesteld op Avro-gecodeerde inhoud en de bijbehorende content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Integratie van Event Hubs
Integratie met Event Hubs om een EventData
object te ontvangen en de met Avro gecodeerde body
waarde te decoderen.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Problemen oplossen
Algemeen
Azure Schema Registry Avro Encoder genereert uitzonderingen die zijn gedefinieerd in Azure Core als er fouten zijn opgetreden bij de communicatie met de Schema Registry-service. Fouten met betrekking tot ongeldige inhouds-/inhoudstypen en ongeldige schema's worden gegenereerd als azure.schemaregistry.encoder.avroencoder.InvalidContentError
respectievelijk en azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
, waarbij __cause__
de onderliggende uitzondering wordt weergegeven door de Apache Avro-bibliotheek.
Logboekregistratie
Deze bibliotheek maakt gebruik van de standaardbibliotheek voor logboekregistratie voor logboekregistratie. Basisinformatie over HTTP-sessies (URL's, headers, enzovoort) wordt geregistreerd op INFO-niveau.
Gedetailleerde logboekregistratie op foutopsporingsniveau, inclusief aanvraag-/antwoordteksten en niet-bewerkte headers, kan worden ingeschakeld op een client met het logging_enable
argument:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Op dezelfde manier kan logging_enable
logboekregistratie voor één bewerking inschakelen, zelfs wanneer dit niet is ingeschakeld voor de client:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Volgende stappen
Meer voorbeeldcode
Meer voorbeelden van veelvoorkomende Avro Encoder-scenario's voor Azure Schema Registry bevinden zich in de map met voorbeelden .
Bijdragen
Wij verwelkomen bijdragen en suggesties voor dit project. Voor de meeste bijdragen moet u instemmen met een licentieovereenkomst voor bijdragers (CLA: Contributor License Agreement) waarin u verklaart dat u gerechtigd bent ons het recht te geven uw bijdrage te gebruiken, en dat u dit ook doet. Ga naar https://cla.microsoft.com voor meer informatie.
Wanneer u een pull-aanvraag indient, wordt met een CLA-bot automatisch bepaald of u een CLA moet verschaffen en wordt de pull-aanvraag dienovereenkomstig opgemaakt (bijvoorbeeld met een label of commentaar). Volg gewoon de instructies van de bot. U hoeft dit maar eenmaal te doen voor alle repo's waar gebruik wordt gemaakt van onze CLA.
Op dit project is de Microsoft Open Source Code of Conduct (Microsoft Open Source-gedragscode) van toepassing. Zie de Veelgestelde vragen over de gedragscode voor meer informatie of neem contact op opencode@microsoft.com met eventuele aanvullende vragen of opmerkingen.
Azure SDK for Python