你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Python 的 Azure 架构注册表 Avro 序列化程序客户端库 - 版本 1.0.0b4

Azure 架构注册表是由 Azure 事件中心 托管的架构存储库服务,提供架构存储、版本控制和管理。 此包提供 Avro 序列化程序,能够序列化和反序列化包含架构注册表架构标识符和 Avro 编码数据的有效负载。

源代码 | 包 (PyPi) | API 参考文档 | 样品 | 更改日志

免责声明

适用于 Python 2.7 的 Azure SDK Python 包支持在 2022 年 1 月 1 日结束。 有关详细信息和问题,请参阅 https://github.com/Azure/azure-sdk-for-python/issues/20691

入门

安装包

使用 pip 安装适用于 Python 的 Azure 架构注册表 Avro Serializer 客户端库和 Azure 标识客户端库:

pip install azure-schemaregistry-avroserializer azure-identity

先决条件:

若要使用此包,必须具有:

验证客户端

与架构注册表 Avro Serializer 的交互从 AvroSerializer 类的实例开始,该类采用架构组名称和 架构注册表客户端 类。 客户端构造函数采用事件中心完全限定的命名空间和 Azure Active Directory 凭据:

  • 架构注册表实例的完全限定命名空间应采用以下格式: <yournamespace>.servicebus.windows.net

  • 应将实现 TokenCredential 协议的 AAD 凭据传递给构造函数。 azure-identity 包中提供了协议的实现TokenCredential。 若要使用 提供的 azure-identity凭据类型,请使用 pip 安装适用于 Python 的 Azure 标识客户端库:

pip install azure-identity
  • 此外,若要使用 Python 3.6+ 上支持的异步 API,必须先安装异步传输,例如 aiohttp
pip install aiohttp

使用 azure-schemaregistry 库创建 AvroSerializer:

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

关键概念

AvroSerializer

提供 API 以序列化为 Avro 二进制编码并从中反序列化,以及具有架构 ID 的标头。 使用 SchemaRegistryClient 从架构内容获取架构 ID,反之亦然。

消息格式

跨 Azure SDK 语言的架构注册表序列化程序使用相同的格式。

消息的编码方式如下:

  • 4 字节:格式指示器

    • 当前始终为零以指示以下格式。
  • 32 字节:架构 ID

    • GUID 的 UTF-8 十六进制表示形式。
    • 32 个十六进制数字,无连字符。
    • 格式和字节顺序与架构注册表服务中字符串相同。
  • 剩余字节:Avro 有效负载 (常规、特定于格式的有效负载)

    • Avro 二进制编码
    • NOT Avro 对象容器文件,该文件包含架构,不符合此序列化器将架构移出消息有效负载并移入架构注册表的目的。

示例

以下部分提供了几个代码片段,涵盖了一些最常见的架构注册表任务,包括:

序列化

使用 AvroSerializer.serialize 方法使用给定的 avro 架构序列化 dict 数据。 方法将使用以前注册到架构注册表服务的架构,并保持架构缓存以供将来序列化使用。 还可以避免将架构预先注册到服务,并通过使用关键字参数 auto_register_schemas=True实例化 AvroSerializer 自动注册serialize到 方法。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

反序列化

使用 AvroSerializer.deserialize 方法将原始字节反序列化为 dict 数据。 方法自动从架构注册表服务检索架构,并保持架构缓存以供将来反序列化使用。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

事件中心发送集成

事件中心 集成,将序列化的 avro dict 数据作为 EventData 的正文发送。

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

事件中心接收集成

事件中心 集成,以接收 EventData 原始字节并将其反序列化为 avro dict 数据。

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

疑难解答

常规

Azure 架构注册表 Avro Serializer 会引发 Azure Core 中定义的异常。

日志记录

此库使用标准 日志记录 库进行日志记录。 有关 HTTP 会话 (URL、标头等的基本信息,) 在 INFO 级别记录。

可以使用 参数在客户端 logging_enable 上启用详细的调试级别日志记录,包括请求/响应正文和未处理标头:

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

同样,即使没有为客户端启用详细日志记录,logging_enable 也可以为单个操作启用:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

后续步骤

更多示例代码

请在演示常见 Azure 架构注册表 Avro 序列化程序方案 的示例 目录中查找更多示例。

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。