Udostępnij za pośrednictwem


Biblioteka klienta programu Azure Schema Registry Avro Serializer dla języka Python — wersja 1.0.0b4

Usługa Azure Schema Registry to usługa repozytorium schematów hostowana przez Azure Event Hubs, zapewniając magazyn schematów, przechowywanie wersji i zarządzanie nimi. Ten pakiet zapewnia serializator Avro umożliwiający serializowanie i deserializacji ładunków zawierających identyfikatory schematu rejestru schematów i dane zakodowane w formacie Avro.

Kod | źródłowy Pakiet (PyPi) | Dokumentacja referencyjna interfejsu | API Próbki | Changelog

Zrzeczenie odpowiedzialności

Obsługa pakietów języka Python zestawu Azure SDK dla języka Python 2.7 kończy się 01 stycznia 2022 r. Aby uzyskać więcej informacji i pytań, zapoznaj się z artykułem https://github.com/Azure/azure-sdk-for-python/issues/20691

Wprowadzenie

Instalowanie pakietu

Zainstaluj bibliotekę klienta programu Azure Schema Registry Avro Serializer i bibliotekę klienta tożsamości platformy Azure dla języka Python przy użyciu narzędzia pip:

pip install azure-schemaregistry-avroserializer azure-identity

Wymagania wstępne:

Aby użyć tego pakietu, musisz mieć następujące elementy:

Uwierzytelnianie klienta

Interakcja z rejestrem schematów Avro Serializer rozpoczyna się od wystąpienia klasy AvroSerializer, która przyjmuje nazwę grupy schematów i klasę klienta rejestru schematów . Konstruktor klienta przyjmuje w pełni kwalifikowaną przestrzeń nazw usługi Event Hubs i poświadczenia usługi Azure Active Directory:

  • W pełni kwalifikowana przestrzeń nazw wystąpienia rejestru schematów powinna mieć następujący format: <yournamespace>.servicebus.windows.net.

  • Do konstruktora należy przekazać poświadczenie usługi AAD, które implementuje protokół TokenCredential . Istnieją implementacje TokenCredential protokołu dostępnego w pakiecie azure-identity. Aby użyć typów poświadczeń dostarczonych przez azure-identityprogram , zainstaluj bibliotekę klienta tożsamości platformy Azure dla języka Python za pomocą narzędzia pip:

pip install azure-identity
  • Ponadto aby korzystać z interfejsu API asynchronicznego obsługiwanego w języku Python 3.6 lub nowszym, musisz najpierw zainstalować transport asynchroniczny, taki jak aiohttp:
pip install aiohttp

Utwórz narzędzie AvroSerializer przy użyciu biblioteki azure-schemaregistry:

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Kluczowe pojęcia

AvroSerializer

Udostępnia interfejs API do serializacji i deserializacji z kodowania binarnego Avro oraz nagłówka o identyfikatorze schematu. Używa elementu SchemaRegistryClient , aby uzyskać identyfikatory schematu z zawartości schematu lub na odwrót.

Format wiadomości

Ten sam format jest używany przez serializatory rejestru schematów w językach zestawu Azure SDK.

Komunikaty są kodowane w następujący sposób:

  • 4 bajty: wskaźnik formatu

    • Obecnie zawsze zero, aby wskazać format poniżej.
  • 32 bajty: identyfikator schematu

    • Reprezentacja szesnastkowa identyfikatora GUID UTF-8.
    • 32 cyfry szesnastkowej, bez łączników.
    • Ten sam format i kolejność bajtów co ciąg z usługi Rejestru schematów.
  • Pozostałe bajty: ładunek Avro (ogólnie ładunek specyficzny dla formatu)

    • Kodowanie binarne Avro
    • NIE Avro Object Container File, który zawiera schemat i usuwa cel tego serialzera, aby przenieść schemat z ładunku komunikatu i do rejestru schematów.

Przykłady

W poniższych sekcjach przedstawiono kilka fragmentów kodu obejmujących niektóre z najbardziej typowych zadań rejestru schematów, w tym:

Serializacja

Użyj AvroSerializer.serialize metody , aby serializować dane dyktowania przy użyciu danego schematu avro. Metoda będzie używać schematu wcześniej zarejestrowanego w usłudze Rejestru schematów i zachować schemat buforowany na potrzeby przyszłego użycia serializacji. Istnieje również możliwość uniknięcia wstępnego zarejestrowania schematu w usłudze i automatycznego zarejestrowania serialize przy użyciu metody przez utworzenie wystąpienia AvroSerializer elementu za pomocą argumentu auto_register_schemas=Truesłowa kluczowego .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Deserializacji

Użyj AvroSerializer.deserialize metody do deserializacji nieprzetworzonych bajtów do danych dyktowania. Metoda automatycznie pobiera schemat z usługi rejestru schematów i przechowuje schemat buforowany na potrzeby przyszłego użycia deserializacji.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Integracja wysyłania usługi Event Hubs

Integracja z usługą Event Hubs w celu wysyłania serializowanych danych avro dict jako treści eventData.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Odbieranie integracji z usługą Event Hubs

Integracja z usługą Event Hubs w celu odbierania EventData i deserializacji nieprzetworzonych bajtów do danych avro dict.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Rozwiązywanie problemów

Ogólne

Program Serializator avro rejestru schematów platformy Azure zgłasza wyjątki zdefiniowane w usłudze Azure Core.

Rejestrowanie

Ta biblioteka używa standardowej biblioteki rejestrowania do rejestrowania. Podstawowe informacje o sesjach HTTP (adresach URL, nagłówkach itp.) są rejestrowane na poziomie INFORMACJI.

Szczegółowe rejestrowanie na poziomie DEBUG, w tym treści żądań/odpowiedzi i nieredagowanych nagłówków, można włączyć na kliencie z argumentem logging_enable :

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

logging_enable Podobnie można włączyć szczegółowe rejestrowanie dla pojedynczej operacji, nawet jeśli nie jest włączone dla klienta:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Następne kroki

Więcej przykładów kodu

Więcej przykładów można znaleźć w katalogu przykładów pokazujących typowe scenariusze serializatora avro rejestru schematów platformy Azure.

Współtworzenie

W tym projekcie zachęcamy do współtworzenia i zgłaszania sugestii. Współtworzenie w większości przypadków wymaga zgody na umowę licencyjną dotyczącą współautorów (CLA, Contributor License Agreement), zgodnie z którą współautor ma prawo udzielić i faktycznie udziela nam praw do używania wytworzonej przez siebie zawartości. Aby uzyskać szczegółowe informacje, odwiedź stronę https://cla.microsoft.com.

Po przesłaniu żądania ściągnięcia robot CLA automatycznie określi, czy musisz przekazać umowę CLA, i doda odpowiednie informacje do tego żądania (na przykład etykietę czy komentarz). Po prostu postępuj zgodnie z instrukcjami robota. Wystarczy zrobić to raz dla wszystkich repozytoriów, w przypadku których jest używana nasza umowa CLA.

W tym projekcie przyjęto Kodeks postępowania oprogramowania Open Source firmy Microsoft. Aby uzyskać więcej informacji, zobacz Często zadawane pytania dotyczące kodeksu postępowania lub skontaktuj się z opencode@microsoft.com dodatkowymi pytaniami lub komentarzami.