Python용 Azure 스키마 레지스트리 Avro Serializer 클라이언트 라이브러리 - 버전 1.0.0b4

Azure Schema Registry는 Azure Event Hubs 호스팅하는 스키마 리포지토리 서비스로, 스키마 스토리지, 버전 관리 및 관리를 제공합니다. 이 패키지는 스키마 레지스트리 스키마 식별자 및 Avro로 인코딩된 데이터를 포함하는 페이로드를 직렬화하고 역직렬화할 수 있는 Avro 직렬 변환기를 제공합니다.

소스 코드 | 패키지(PyPi) | API 참조 설명서 | 샘플 | Changelog

고지 사항

Python 2.7에 대한 Azure SDK Python 패키지 지원은 2022년 1월 1일에 종료됩니다. 자세한 내용과 질문은 다음을 참조하세요. https://github.com/Azure/azure-sdk-for-python/issues/20691

시작

패키지 설치

pip를 사용하여 Python용 Azure 스키마 레지스트리 Avro Serializer 클라이언트 라이브러리 및 Azure Identity 클라이언트 라이브러리를 설치합니다.

pip install azure-schemaregistry-avroserializer azure-identity

필수 조건:

이 패키지를 사용하려면 다음이 있어야 합니다.

클라이언트 인증

스키마 레지스트리 Avro Serializer와의 상호 작용은 스키마 그룹 이름과 스키마 레지스트리 클라이언트 클래스를 사용하는 AvroSerializer 클래스의 인스턴스로 시작합니다. 클라이언트 생성자는 Event Hubs 정규화된 네임스페이스 및 Azure Active Directory 자격 증명을 사용합니다.

  • 스키마 레지스트리 인스턴스의 정규화된 네임스페이스는 형식 <yournamespace>.servicebus.windows.net을 따라야 합니다.

  • TokenCredential 프로토콜을 구현하는 AAD 자격 증명을 생성자에 전달해야 합니다. azure-identity 패키지에서 사용할 수 있는 프로토콜의 TokenCredential 구현이 있습니다. 가 제공하는 azure-identity자격 증명 형식을 사용하려면 pip를 사용하여 Python용 Azure ID 클라이언트 라이브러리를 설치하세요.

pip install azure-identity
  • 또한 Python 3.6 이상에서 지원되는 비동기 API를 사용하려면 먼저 aiohttp와 같은 비동기 전송을 설치해야 합니다.
pip install aiohttp

azure-schemaregistry 라이브러리를 사용하여 AvroSerializer를 만듭니다.

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

주요 개념

AvroSerializer

Avro 이진 인코딩에서 직렬화 및 역직렬화하는 API와 스키마 ID가 있는 헤더를 제공합니다. SchemaRegistryClient를 사용하여 스키마 콘텐츠에서 스키마 ID를 얻거나 그 반대의 경우도 마찬가지입니다.

메시지 형식

동일한 형식은 Azure SDK 언어의 스키마 레지스트리 직렬 변환기에서 사용됩니다.

메시지는 다음과 같이 인코딩됩니다.

  • 4바이트: 서식 표시기

    • 현재는 항상 아래 형식을 나타내기 위해 0입니다.
  • 32바이트: 스키마 ID

    • GUID의 UTF-8 16진수 표현입니다.
    • 32 16진수, 하이픈 없음.
    • 스키마 레지스트리 서비스의 문자열과 동일한 형식 및 바이트 순서입니다.
  • 나머지 바이트: Avro 페이로드(일반적으로 형식별 페이로드)

    • Avro 이진 인코딩
    • NOT Avro 개체 컨테이너 파일- 스키마를 포함하고 이 serialzer의 목적을 무너뜨려 스키마를 메시지 페이로드에서 스키마 레지스트리로 이동합니다.

예제

다음 섹션에서는 다음을 포함하여 가장 일반적인 스키마 레지스트리 작업 중 일부를 다루는 몇 가지 코드 조각을 제공합니다.

Serialization

메서드를 사용하여 AvroSerializer.serialize 지정된 avro 스키마를 사용하여 받아쓰기 데이터를 직렬화합니다. 메서드는 이전에 스키마 레지스트리 서비스에 등록된 스키마를 사용하고 향후 serialization 사용을 위해 스키마를 캐시된 상태로 유지합니다. 또한 키워드 인수 auto_register_schemas=True를 사용하여 를 인스턴스화하여 서비스에 스키마를 미리 등록하지 않고 메서드에 자동으로 등록 serializeAvroSerializer 수도 있습니다.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Deserialization

메서드를 사용하여 AvroSerializer.deserialize 원시 바이트를 받아쓰기 데이터로 역직렬화합니다. 메서드는 스키마 레지스트리 서비스에서 스키마를 자동으로 검색하고 향후 역직렬화 사용을 위해 스키마를 캐시된 상태를 유지합니다.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Event Hubs 전송 통합

Event Hubs와 통합하여 직렬화된 avro dict 데이터를 EventData의 본문으로 보냅니다.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Event Hubs 수신 통합

Event Hubs와 통합하여 원시 바이트를 수신 EventData 하고 avro dict 데이터로 역직렬화합니다.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

문제 해결

일반

Azure 스키마 레지스트리 Avro Serializer는 Azure Core에 정의된 예외를 발생합니다.

로깅

이 라이브러리는 로깅에 표준 로깅 라이브러리를 사용합니다. HTTP 세션(URL, 헤더 등)에 대한 기본 정보는 INFO 수준에서 기록됩니다.

요청/응답 본문 및 수정되지 않은 헤더를 포함한 자세한 DEBUG 수준 로깅은 인수를 사용하여 클라이언트 logging_enable 에서 사용하도록 설정할 수 있습니다.

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

마찬가지로 logging_enable은 클라이언트에 대해 상세 로깅을 사용하지 않는 경우에도 한 작업에만 사용하게 설정할 수 있습니다.

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

다음 단계

추가 샘플 코드

일반적인 Azure 스키마 레지스트리 Avro Serializer 시나리오를 보여주는 샘플 디렉터리에서 추가 예제를 확인하세요.

참여

이 프로젝트에 대한 기여와 제안을 환영합니다. 대부분의 경우 기여하려면 권한을 부여하며 실제로 기여를 사용할 권한을 당사에 부여한다고 선언하는 CLA(기여자 라이선스 계약)에 동의해야 합니다. 자세한 내용은 https://cla.microsoft.com 을 참조하세요.

끌어오기 요청을 제출하면 CLA-bot은 CLA를 제공하고 PR을 적절하게 데코레이팅해야 하는지 여부를 자동으로 결정합니다(예: 레이블, 설명). 봇에서 제공하는 지침을 따르기만 하면 됩니다. 이 작업은 CLA를 사용하여 모든 리포지토리에서 한 번만 수행하면 됩니다.

이 프로젝트에는 Microsoft Open Source Code of Conduct(Microsoft 오픈 소스 준수 사항)가 적용됩니다. 자세한 내용은 Code of Conduct FAQ(규정 FAQ)를 참조하세요. 또는 추가 질문이나 의견은 opencode@microsoft.com으로 문의하세요.