Biblioteka klienta usługi Azure Schema Registry Avro Encoder dla języka Python — wersja 1.0.0
Usługa Azure Schema Registry to usługa repozytorium schematów hostowana przez Azure Event Hubs, zapewniając magazyn schematów, przechowywanie wersji i zarządzanie nimi. Ten pakiet zapewnia koder Avro umożliwiający kodowanie i dekodowanie ładunków zawierających identyfikatory schematu rejestru schematów i zawartość zakodowaną w formacie Avro.
Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Próbki | Changelog
Zrzeczenie odpowiedzialności
Obsługa pakietów języka Python dla zestawu Azure SDK dla języka Python 2.7 zakończyła się 1 stycznia 2022 r. Aby uzyskać więcej informacji i pytań, zapoznaj się z artykułem https://github.com/Azure/azure-sdk-for-python/issues/20691
Wprowadzenie
Instalowanie pakietu
Zainstaluj bibliotekę klienta usługi Azure Schema Registry Avro Encoder dla języka Python za pomocą narzędzia pip:
pip install azure-schemaregistry-avroencoder
Wymagania wstępne:
Aby użyć tego pakietu, musisz mieć następujące elementy:
- Subskrypcja platformy Azure — można utworzyć bezpłatne konto
- Rejestr - schematów platformy AzureOto przewodnik Szybki start dotyczący tworzenia grupy rejestru schematów przy użyciu Azure Portal.
- Środowisko Python w wersji 3.6 lub nowszej — instalowanie języka Python
Uwierzytelnianie klienta
Interakcja z koderem Avro Rejestru schematów rozpoczyna się od wystąpienia klasy AvroEncoder, która przyjmuje nazwę grupy schematów i klasę klienta rejestru schematów . Konstruktor klienta przyjmuje w pełni kwalifikowaną przestrzeń nazw usługi Event Hubs i poświadczenia usługi Azure Active Directory:
W pełni kwalifikowana przestrzeń nazw wystąpienia rejestru schematów powinna mieć następujący format:
<yournamespace>.servicebus.windows.net
.Do konstruktora należy przekazać poświadczenie usługi AAD, które implementuje protokół TokenCredential . Istnieją implementacje
TokenCredential
protokołu dostępnego w pakiecie azure-identity. Aby użyć typów poświadczeń dostarczonych przezazure-identity
program , zainstaluj bibliotekę klienta tożsamości platformy Azure dla języka Python za pomocą narzędzia pip:
pip install azure-identity
- Ponadto aby użyć interfejsu API asynchronicznego, należy najpierw zainstalować transport asynchroniczny, taki jak aiohttp:
pip install aiohttp
Utwórz narzędzie AvroEncoder przy użyciu biblioteki azure-schemaregistry:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Kluczowe pojęcia
AvroEncoder
Udostępnia interfejs API umożliwiający kodowanie i dekodowanie z kodowania binarnego Avro oraz typ zawartości z identyfikatorem schematu. Używa elementu SchemaRegistryClient , aby uzyskać identyfikatory schematu z zawartości schematu lub na odwrót.
Obsługiwane modele komunikatów
Dodano obsługę niektórych klas modeli zestawu Azure Messaging SDK na potrzeby współdziałania z usługą AvroEncoder
. Te modele są podtypami protokołu zdefiniowanego MessageType
w azure.schemaregistry.encoder.avroencoder
przestrzeni nazw. Obecnie obsługiwane klasy modeli to:
azure.eventhub.EventData
Forazure-eventhub>=5.9.0
Format wiadomości
Jeśli typ komunikatu zgodny z protokołem MessageType jest dostarczany do kodera na potrzeby kodowania, ustawi odpowiednie właściwości zawartości i typu zawartości, gdzie:
content
: Ładunek Avro (ogólnie ładunek specyficzny dla formatu)- Kodowanie binarne Avro
- NIE Avro Object Container File, który zawiera schemat i usuwa cel tego kodera, aby przenieść schemat z ładunku komunikatu i do rejestru schematów.
content type
: ciąg formatuavro/binary+<schema ID>
, gdzie:avro/binary
to wskaźnik formatu<schema ID>
to reprezentacja szesnastkowa identyfikatora GUID, tego samego formatu i kolejności bajtów co ciąg z usługi Rejestru schematów.
Jeśli EventData
element jest przekazywany jako typ komunikatu EventData
, dla obiektu zostaną ustawione następujące właściwości:
- Właściwość
body
zostanie ustawiona na wartość zawartości. - Właściwość
content_type
zostanie ustawiona na wartość typu zawartości.
Jeśli typ komunikatu nie zostanie podany i domyślnie koder utworzy następujący dykt: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Przykłady
W poniższych sekcjach przedstawiono kilka fragmentów kodu obejmujących niektóre z najbardziej typowych zadań rejestru schematów, w tym:
- Kodowanie
- Dekodowania
- Integracja wysyłania usługi Event Hubs
- Odbieranie integracji z usługą Event Hubs
Encoding
AvroEncoder.encode
Użyj metody , aby zakodować zawartość przy użyciu danego schematu Avro.
Metoda będzie używać schematu wcześniej zarejestrowanego w usłudze Rejestru schematów i zachować schemat buforowany na potrzeby przyszłego użycia kodowania. Aby uniknąć wstępnego rejestrowania schematu w usłudze i automatycznego rejestrowania go za encode
pomocą metody, argument auto_register=True
słowa kluczowego powinien zostać przekazany do konstruktora AvroEncoder
.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Dekodowania
AvroEncoder.decode
Użyj metody , aby zdekodować zawartość zakodowaną w formacie Avro za pomocą jednej z następujących metod:
- Przekazywanie obiektu komunikatu, który jest podtypem protokołu MessageType.
- Przekazywanie dykt z kluczami
content
(bajtami typu) icontent_type
(ciąg typu). Metoda automatycznie pobiera schemat z usługi rejestru schematów i przechowuje schemat buforowany na potrzeby przyszłego użycia dekodowania.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Integracja wysyłania usługi Event Hubs
Integracja z usługą Event Hubs w celu wysyłania EventData
obiektu z ustawioną zawartością zakodowaną body
w formacie Avro i odpowiadającym mu elementem content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Odbieranie integracji z usługą Event Hubs
Integracja z usługą Event Hubs w celu odbierania EventData
obiektu i dekodowania wartości zakodowanej body
w formacie Avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Rozwiązywanie problemów
Ogólne
Usługa Azure Schema Registry Avro Encoder zgłasza wyjątki zdefiniowane w usłudze Azure Core , jeśli występują błędy podczas komunikowania się z usługą Rejestru schematów. Błędy związane z nieprawidłowymi typami zawartości/zawartości i nieprawidłowymi schematami zostaną zgłoszone odpowiednio jako azure.schemaregistry.encoder.avroencoder.InvalidContentError
i azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
, gdzie __cause__
będą zawierać podstawowy wyjątek zgłoszony przez bibliotekę Apache Avro.
Rejestrowanie
Ta biblioteka używa standardowej biblioteki rejestrowania do rejestrowania. Podstawowe informacje o sesjach HTTP (adresach URL, nagłówkach itp.) są rejestrowane na poziomie INFORMACJI.
Szczegółowe rejestrowanie na poziomie DEBUG, w tym treści żądań/odpowiedzi i nieredagowanych nagłówków, można włączyć na kliencie z argumentem logging_enable
:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
logging_enable
Podobnie można włączyć szczegółowe rejestrowanie dla pojedynczej operacji, nawet jeśli nie jest włączone dla klienta:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Następne kroki
Więcej przykładów kodu
Dalsze przykłady pokazujące typowe scenariusze kodera Avro usługi Azure Schema Registry znajdują się w katalogu samples .
Współtworzenie
W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.
Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.
W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.
Azure SDK for Python