Udostępnij za pośrednictwem


Biblioteka klienta usługi Azure Schema Registry Avro Encoder dla języka Python — wersja 1.0.0

Usługa Azure Schema Registry to usługa repozytorium schematów hostowana przez Azure Event Hubs, zapewniając magazyn schematów, przechowywanie wersji i zarządzanie nimi. Ten pakiet zapewnia koder Avro umożliwiający kodowanie i dekodowanie ładunków zawierających identyfikatory schematu rejestru schematów i zawartość zakodowaną w formacie Avro.

Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Próbki | Changelog

Zrzeczenie odpowiedzialności

Obsługa pakietów języka Python dla zestawu Azure SDK dla języka Python 2.7 zakończyła się 1 stycznia 2022 r. Aby uzyskać więcej informacji i pytań, zapoznaj się z artykułem https://github.com/Azure/azure-sdk-for-python/issues/20691

Wprowadzenie

Instalowanie pakietu

Zainstaluj bibliotekę klienta usługi Azure Schema Registry Avro Encoder dla języka Python za pomocą narzędzia pip:

pip install azure-schemaregistry-avroencoder

Wymagania wstępne:

Aby użyć tego pakietu, musisz mieć następujące elementy:

Uwierzytelnianie klienta

Interakcja z koderem Avro Rejestru schematów rozpoczyna się od wystąpienia klasy AvroEncoder, która przyjmuje nazwę grupy schematów i klasę klienta rejestru schematów . Konstruktor klienta przyjmuje w pełni kwalifikowaną przestrzeń nazw usługi Event Hubs i poświadczenia usługi Azure Active Directory:

  • W pełni kwalifikowana przestrzeń nazw wystąpienia rejestru schematów powinna mieć następujący format: <yournamespace>.servicebus.windows.net.

  • Do konstruktora należy przekazać poświadczenie usługi AAD, które implementuje protokół TokenCredential . Istnieją implementacje TokenCredential protokołu dostępnego w pakiecie azure-identity. Aby użyć typów poświadczeń dostarczonych przez azure-identityprogram , zainstaluj bibliotekę klienta tożsamości platformy Azure dla języka Python za pomocą narzędzia pip:

pip install azure-identity
  • Ponadto aby użyć interfejsu API asynchronicznego, należy najpierw zainstalować transport asynchroniczny, taki jak aiohttp:
pip install aiohttp

Utwórz narzędzie AvroEncoder przy użyciu biblioteki azure-schemaregistry:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Kluczowe pojęcia

AvroEncoder

Udostępnia interfejs API umożliwiający kodowanie i dekodowanie z kodowania binarnego Avro oraz typ zawartości z identyfikatorem schematu. Używa elementu SchemaRegistryClient , aby uzyskać identyfikatory schematu z zawartości schematu lub na odwrót.

Obsługiwane modele komunikatów

Dodano obsługę niektórych klas modeli zestawu Azure Messaging SDK na potrzeby współdziałania z usługą AvroEncoder. Te modele są podtypami protokołu zdefiniowanego MessageType w azure.schemaregistry.encoder.avroencoder przestrzeni nazw. Obecnie obsługiwane klasy modeli to:

  • azure.eventhub.EventData For azure-eventhub>=5.9.0

Format wiadomości

Jeśli typ komunikatu zgodny z protokołem MessageType jest dostarczany do kodera na potrzeby kodowania, ustawi odpowiednie właściwości zawartości i typu zawartości, gdzie:

  • content: Ładunek Avro (ogólnie ładunek specyficzny dla formatu)

    • Kodowanie binarne Avro
    • NIE Avro Object Container File, który zawiera schemat i usuwa cel tego kodera, aby przenieść schemat z ładunku komunikatu i do rejestru schematów.
  • content type: ciąg formatu avro/binary+<schema ID>, gdzie:

    • avro/binary to wskaźnik formatu
    • <schema ID> to reprezentacja szesnastkowa identyfikatora GUID, tego samego formatu i kolejności bajtów co ciąg z usługi Rejestru schematów.

Jeśli EventData element jest przekazywany jako typ komunikatu EventData , dla obiektu zostaną ustawione następujące właściwości:

  • Właściwość body zostanie ustawiona na wartość zawartości.
  • Właściwość content_type zostanie ustawiona na wartość typu zawartości.

Jeśli typ komunikatu nie zostanie podany i domyślnie koder utworzy następujący dykt: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Przykłady

W poniższych sekcjach przedstawiono kilka fragmentów kodu obejmujących niektóre z najbardziej typowych zadań rejestru schematów, w tym:

Encoding

AvroEncoder.encode Użyj metody , aby zakodować zawartość przy użyciu danego schematu Avro. Metoda będzie używać schematu wcześniej zarejestrowanego w usłudze Rejestru schematów i zachować schemat buforowany na potrzeby przyszłego użycia kodowania. Aby uniknąć wstępnego rejestrowania schematu w usłudze i automatycznego rejestrowania go za encode pomocą metody, argument auto_register=True słowa kluczowego powinien zostać przekazany do konstruktora AvroEncoder .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Dekodowania

AvroEncoder.decode Użyj metody , aby zdekodować zawartość zakodowaną w formacie Avro za pomocą jednej z następujących metod:

  • Przekazywanie obiektu komunikatu, który jest podtypem protokołu MessageType.
  • Przekazywanie dykt z kluczami content(bajtami typu) i content_type (ciąg typu). Metoda automatycznie pobiera schemat z usługi rejestru schematów i przechowuje schemat buforowany na potrzeby przyszłego użycia dekodowania.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Integracja wysyłania usługi Event Hubs

Integracja z usługą Event Hubs w celu wysyłania EventData obiektu z ustawioną zawartością zakodowaną body w formacie Avro i odpowiadającym mu elementem content_type.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Odbieranie integracji z usługą Event Hubs

Integracja z usługą Event Hubs w celu odbierania EventData obiektu i dekodowania wartości zakodowanej body w formacie Avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Rozwiązywanie problemów

Ogólne

Usługa Azure Schema Registry Avro Encoder zgłasza wyjątki zdefiniowane w usłudze Azure Core , jeśli występują błędy podczas komunikowania się z usługą Rejestru schematów. Błędy związane z nieprawidłowymi typami zawartości/zawartości i nieprawidłowymi schematami zostaną zgłoszone odpowiednio jako azure.schemaregistry.encoder.avroencoder.InvalidContentError i azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, gdzie __cause__ będą zawierać podstawowy wyjątek zgłoszony przez bibliotekę Apache Avro.

Rejestrowanie

Ta biblioteka używa standardowej biblioteki rejestrowania do rejestrowania. Podstawowe informacje o sesjach HTTP (adresach URL, nagłówkach itp.) są rejestrowane na poziomie INFORMACJI.

Szczegółowe rejestrowanie na poziomie DEBUG, w tym treści żądań/odpowiedzi i nieredagowanych nagłówków, można włączyć na kliencie z argumentem logging_enable :

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

logging_enable Podobnie można włączyć szczegółowe rejestrowanie dla pojedynczej operacji, nawet jeśli nie jest włączone dla klienta:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Następne kroki

Więcej przykładów kodu

Dalsze przykłady pokazujące typowe scenariusze kodera Avro usługi Azure Schema Registry znajdują się w katalogu samples .

Współtworzenie

W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.

Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.

W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.