Delen via


Azure Schema Registry Avro Encoder-clientbibliotheek voor Python - versie 1.0.0

Azure Schema Registry is een service voor schemaopslagplaats die wordt gehost door Azure Event Hubs en die schema-opslag, versiebeheer en beheer biedt. Dit pakket biedt een Avro-encoder die payloads met schemaregisterschema-id's en avro-gecodeerde inhoud kan coderen en decoderen.

Broncode | Pakket (PyPi) | API-referentiedocumentatie | Monsters | Changelog

Disclaimer

Ondersteuning voor Azure SDK Python-pakketten voor Python 2.7 is beëindigd op 1 januari 2022. Voor meer informatie en vragen raadpleegt u https://github.com/Azure/azure-sdk-for-python/issues/20691

Aan de slag

Het pakket installeren

Installeer de Azure Schema Registry Avro Encoder-clientbibliotheek voor Python met pip:

pip install azure-schemaregistry-avroencoder

Vereisten:

Als u dit pakket wilt gebruiken, hebt u het volgende nodig:

De client verifiëren

Interactie met de Avro-encoder van het schemaregister begint met een exemplaar van de klasse AvroEncoder, waarbij de naam van de schemagroep en de clientklasse schemaregister worden gebruikt. De clientconstructor neemt de Volledig gekwalificeerde Event Hubs-naamruimte en azure Active Directory-referenties op:

  • De volledig gekwalificeerde naamruimte van het schemaregisterexemplaren moet de volgende indeling hebben: <yournamespace>.servicebus.windows.net.

  • Een AAD-referentie waarmee het TokenCredential-protocol wordt geïmplementeerd, moet worden doorgegeven aan de constructor. Er zijn implementaties van het TokenCredential protocol beschikbaar in het azure-identity-pakket. Als u de referentietypen wilt gebruiken die worden geleverd door azure-identity, installeert u de Azure Identity-clientbibliotheek voor Python met pip:

pip install azure-identity
  • Als u de asynchrone API wilt gebruiken, moet u bovendien eerst een asynchroon transport installeren, zoals aiohttp:
pip install aiohttp

AvroEncoder maken met behulp van de bibliotheek azure-schemaregistry:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Belangrijkste concepten

AvroEncoder

Biedt API voor het coderen naar en decoderen van Binaire Avro-codering plus een inhoudstype met schema-id. Maakt gebruik van SchemaRegistryClient om schema-id's op te halen uit schema-inhoud of omgekeerd.

Ondersteunde berichtmodellen

Er is ondersteuning toegevoegd aan bepaalde Azure Messaging SDK-modelklassen voor interoperabiliteit met de AvroEncoder. Deze modellen zijn subtypen van het MessageType protocol dat is gedefinieerd onder de azure.schemaregistry.encoder.avroencoder naamruimte. Momenteel zijn de ondersteunde modelklassen:

  • azure.eventhub.EventData Voor azure-eventhub>=5.9.0

Berichtindeling

Als een berichttype dat volgt op het MessageType-protocol wordt geleverd aan de encoder voor codering, worden de bijbehorende eigenschappen van inhoud en inhoudstype ingesteld, waarbij:

  • content: Avro-nettolading (in het algemeen, indelingsspecifieke nettolading)

    • Binaire Avro-codering
    • NOT Avro-objectcontainerbestand, dat het schema bevat en het doel van deze encoder teniet doet om het schema uit de nettolading van het bericht naar het schemaregister te verplaatsen.
  • content type: een tekenreeks met de indeling avro/binary+<schema ID>, waarbij:

    • avro/binary is de indelingsindicator
    • <schema ID> is de hexadecimale weergave van GUID, dezelfde indeling en bytevolgorde als de tekenreeks van de Schema Registry-service.

Als EventData wordt doorgegeven als het berichttype, worden de volgende eigenschappen ingesteld voor het EventData -object:

  • De body eigenschap wordt ingesteld op de inhoudswaarde.
  • De content_type eigenschap wordt ingesteld op de waarde van het inhoudstype.

Als het berichttype niet is opgegeven en standaard, maakt de encoder het volgende dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Voorbeelden

De volgende secties bevatten verschillende codefragmenten voor enkele van de meest voorkomende schemaregistertaken, waaronder:

Encoding

Gebruik de AvroEncoder.encode methode om inhoud te coderen met het opgegeven Avro-schema. De methode gebruikt een schema dat eerder is geregistreerd bij de Schema Registry-service en houdt het schema in de cache voor toekomstig coderingsgebruik. Om te voorkomen dat het schema vooraf wordt geregistreerd bij de service en het automatisch wordt geregistreerd met de encode methode, moet het trefwoordargument auto_register=True worden doorgegeven aan de AvroEncoder constructor.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Decodering

Gebruik de AvroEncoder.decode methode om de met Avro gecodeerde inhoud te decoderen door:

  • Doorgeven van een berichtobject dat een subtype van het MessageType-protocol is.
  • Een dict doorgeven met sleutels content(type bytes) en content_type (type tekenreeks). Met de methode wordt het schema automatisch opgehaald uit de schemaregisterservice en wordt het schema in de cache opgeslagen voor toekomstig decoderingsgebruik.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Integratie van verzenden van Event Hubs

Integratie met Event Hubs voor het verzenden van een EventData object met body ingesteld op Avro-gecodeerde inhoud en de bijbehorende content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Integratie van Event Hubs

Integratie met Event Hubs om een EventData object te ontvangen en de met Avro gecodeerde body waarde te decoderen.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Problemen oplossen

Algemeen

Azure Schema Registry Avro Encoder genereert uitzonderingen die zijn gedefinieerd in Azure Core als er fouten zijn opgetreden bij de communicatie met de Schema Registry-service. Fouten met betrekking tot ongeldige inhouds-/inhoudstypen en ongeldige schema's worden gegenereerd als azure.schemaregistry.encoder.avroencoder.InvalidContentError respectievelijk en azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, waarbij __cause__ de onderliggende uitzondering wordt weergegeven door de Apache Avro-bibliotheek.

Logboekregistratie

Deze bibliotheek maakt gebruik van de standaardbibliotheek voor logboekregistratie voor logboekregistratie. Basisinformatie over HTTP-sessies (URL's, headers, enzovoort) wordt geregistreerd op INFO-niveau.

Gedetailleerde logboekregistratie op foutopsporingsniveau, inclusief aanvraag-/antwoordteksten en niet-bewerkte headers, kan worden ingeschakeld op een client met het logging_enable argument:

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Op dezelfde manier kan logging_enable logboekregistratie voor één bewerking inschakelen, zelfs wanneer dit niet is ingeschakeld voor de client:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Volgende stappen

Meer voorbeeldcode

Meer voorbeelden van veelvoorkomende Avro Encoder-scenario's voor Azure Schema Registry bevinden zich in de map met voorbeelden .

Bijdragen

Wij verwelkomen bijdragen en suggesties voor dit project. Voor de meeste bijdragen moet u instemmen met een licentieovereenkomst voor bijdragers (CLA: Contributor License Agreement) waarin u verklaart dat u gerechtigd bent ons het recht te geven uw bijdrage te gebruiken, en dat u dit ook doet. Ga naar https://cla.microsoft.com voor meer informatie.

Wanneer u een pull-aanvraag indient, wordt met een CLA-bot automatisch bepaald of u een CLA moet verschaffen en wordt de pull-aanvraag dienovereenkomstig opgemaakt (bijvoorbeeld met een label of commentaar). Volg gewoon de instructies van de bot. U hoeft dit maar eenmaal te doen voor alle repo's waar gebruik wordt gemaakt van onze CLA.

Op dit project is de Microsoft Open Source Code of Conduct (Microsoft Open Source-gedragscode) van toepassing. Zie de Veelgestelde vragen over de gedragscode voor meer informatie of neem contact op opencode@microsoft.com met eventuele aanvullende vragen of opmerkingen.