Biblioteka klienta programu Azure Schema Registry Avro Serializer dla języka Python — wersja 1.0.0b4
Usługa Azure Schema Registry to usługa repozytorium schematów hostowana przez Azure Event Hubs, zapewniając magazyn schematów, przechowywanie wersji i zarządzanie nimi. Ten pakiet zapewnia serializator Avro umożliwiający serializowanie i deserializacji ładunków zawierających identyfikatory schematu rejestru schematów i dane zakodowane w formacie Avro.
Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Próbki | Changelog
Zrzeczenie odpowiedzialności
Obsługa pakietów języka Python zestawu Azure SDK dla języka Python 2.7 kończy się 01 stycznia 2022 r. Aby uzyskać więcej informacji i pytań, zapoznaj się z artykułem https://github.com/Azure/azure-sdk-for-python/issues/20691
Wprowadzenie
Instalowanie pakietu
Zainstaluj bibliotekę klienta programu Azure Schema Registry Avro Serializer i bibliotekę klienta tożsamości platformy Azure dla języka Python przy użyciu narzędzia pip:
pip install azure-schemaregistry-avroserializer azure-identity
Wymagania wstępne:
Aby użyć tego pakietu, musisz mieć następujące elementy:
- Subskrypcja platformy Azure — można utworzyć bezpłatne konto
- Rejestr schematów platformy Azure
- Python 2.7, 3.6 lub nowszy — instalowanie języka Python
Uwierzytelnianie klienta
Interakcja z rejestrem schematów Avro Serializer rozpoczyna się od wystąpienia klasy AvroSerializer, która przyjmuje nazwę grupy schematów i klasę klienta rejestru schematów . Konstruktor klienta przyjmuje w pełni kwalifikowaną przestrzeń nazw usługi Event Hubs i poświadczenia usługi Azure Active Directory:
W pełni kwalifikowana przestrzeń nazw wystąpienia rejestru schematów powinna mieć następujący format:
<yournamespace>.servicebus.windows.net
.Do konstruktora należy przekazać poświadczenie usługi AAD, które implementuje protokół TokenCredential . Istnieją implementacje
TokenCredential
protokołu dostępnego w pakiecie azure-identity. Aby użyć typów poświadczeń dostarczonych przezazure-identity
program , zainstaluj bibliotekę klienta tożsamości platformy Azure dla języka Python za pomocą narzędzia pip:
pip install azure-identity
- Ponadto aby korzystać z interfejsu API asynchronicznego obsługiwanego w języku Python 3.6 lub nowszym, musisz najpierw zainstalować transport asynchroniczny, taki jak aiohttp:
pip install aiohttp
Utwórz narzędzie AvroSerializer przy użyciu biblioteki azure-schemaregistry:
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
Kluczowe pojęcia
AvroSerializer
Udostępnia interfejs API do serializacji i deserializacji z kodowania binarnego Avro oraz nagłówka o identyfikatorze schematu. Używa elementu SchemaRegistryClient , aby uzyskać identyfikatory schematu z zawartości schematu lub na odwrót.
Format wiadomości
Ten sam format jest używany przez serializatory rejestru schematów w językach zestawu Azure SDK.
Komunikaty są kodowane w następujący sposób:
4 bajty: wskaźnik formatu
- Obecnie zawsze zero, aby wskazać format poniżej.
32 bajty: identyfikator schematu
- Reprezentacja szesnastkowa identyfikatora GUID UTF-8.
- 32 cyfry szesnastkowej, bez łączników.
- Ten sam format i kolejność bajtów co ciąg z usługi Rejestru schematów.
Pozostałe bajty: ładunek Avro (ogólnie ładunek specyficzny dla formatu)
- Kodowanie binarne Avro
- NIE Avro Object Container File, który zawiera schemat i usuwa cel tego serialzera, aby przenieść schemat z ładunku komunikatu i do rejestru schematów.
Przykłady
W poniższych sekcjach przedstawiono kilka fragmentów kodu obejmujących niektóre z najbardziej typowych zadań rejestru schematów, w tym:
- Serializacja
- Deserializacji
- Integracja wysyłania usługi Event Hubs
- Odbieranie integracji z usługą Event Hubs
Serializacja
Użyj AvroSerializer.serialize
metody , aby serializować dane dyktowania przy użyciu danego schematu avro.
Metoda będzie używać schematu wcześniej zarejestrowanego w usłudze Rejestru schematów i zachować schemat buforowany na potrzeby przyszłego użycia serializacji. Istnieje również możliwość uniknięcia wstępnego zarejestrowania schematu w usłudze i automatycznego zarejestrowania serialize
przy użyciu metody przez utworzenie wystąpienia AvroSerializer
elementu za pomocą argumentu auto_register_schemas=True
słowa kluczowego .
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema=definition)
Deserializacji
Użyj AvroSerializer.deserialize
metody do deserializacji nieprzetworzonych bajtów do danych dyktowania.
Metoda automatycznie pobiera schemat z usługi rejestru schematów i przechowuje schemat buforowany na potrzeby przyszłego użycia deserializacji.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
Integracja wysyłania usługi Event Hubs
Integracja z usługą Event Hubs w celu wysyłania serializowanych danych avro dict jako treści eventData.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
Odbieranie integracji z usługą Event Hubs
Integracja z usługą Event Hubs w celu odbierania EventData
i deserializacji nieprzetworzonych bajtów do danych avro dict.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Rozwiązywanie problemów
Ogólne
Program Serializator avro rejestru schematów platformy Azure zgłasza wyjątki zdefiniowane w usłudze Azure Core.
Rejestrowanie
Ta biblioteka używa standardowej biblioteki rejestrowania do rejestrowania. Podstawowe informacje o sesjach HTTP (adresach URL, nagłówkach itp.) są rejestrowane na poziomie INFORMACJI.
Szczegółowe rejestrowanie na poziomie DEBUG, w tym treści żądań/odpowiedzi i nieredagowanych nagłówków, można włączyć na kliencie z argumentem logging_enable
:
import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")
logging_enable
Podobnie można włączyć szczegółowe rejestrowanie dla pojedynczej operacji, nawet jeśli nie jest włączone dla klienta:
serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)
Następne kroki
Więcej przykładów kodu
Więcej przykładów można znaleźć w katalogu przykładów pokazujących typowe scenariusze serializatora avro rejestru schematów platformy Azure.
Współtworzenie
W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.
Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.
W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.
Azure SDK for Python