次の方法で共有


Python 用 Azure Schema Registry Avro Serializer クライアント ライブラリ - バージョン 1.0.0b4

Azure Schema Registry は、スキーマ ストレージ、バージョン管理、管理を提供する、Azure Event Hubsによってホストされるスキーマ リポジトリ サービスです。 このパッケージは、スキーマ レジストリ スキーマ識別子と Avro でエンコードされたデータを含むペイロードをシリアル化および逆シリアル化できる Avro シリアライザーを提供します。

ソースコード | パッケージ (PyPi) | API リファレンス ドキュメント | サンプル | Changelog

免責事項

Python 2.7 に対する Azure SDK Python パッケージのサポートは、2022 年 1 月 1 日に終了します。 詳細と質問については、https://github.com/Azure/azure-sdk-for-python/issues/20691 を参照してください

作業の開始

パッケージをインストールする

pip を使用して、Python 用の Azure Schema Registry Avro Serializer クライアント ライブラリと Azure Identity クライアント ライブラリをインストールします。

pip install azure-schemaregistry-avroserializer azure-identity

前提条件:

このパッケージを使用するには、次が必要です。

クライアントを認証する

スキーマ レジストリ Avro シリアライザーとの対話は、まず AvroSerializer クラスのインスタンスから始まり、スキーマ グループ名と スキーマ レジストリ クライアント クラスを受け取ります。 クライアント コンストラクターは、Event Hubs の完全修飾名前空間と Azure Active Directory 資格情報を受け取ります。

  • スキーマ レジストリ インスタンスの完全修飾名前空間は、 の形式 <yournamespace>.servicebus.windows.netに従う必要があります。

  • TokenCredential プロトコルを実装する AAD 資格情報をコンストラクターに渡す必要があります。 azure-identity パッケージで使用できるプロトコルの実装TokenCredentialがあります。 で azure-identity提供される資格情報の種類を使用するには、 pip を使用して Python 用の Azure Identity クライアント ライブラリをインストールしてください。

pip install azure-identity
  • さらに、Python 3.6 以降でサポートされている非同期 API を使用するには、まず 、aiohttp などの非同期トランスポートをインストールする必要があります。
pip install aiohttp

azure-schemaregistry ライブラリを使用して AvroSerializer を作成します。

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

主要な概念

AvroSerializer

Avro Binary Encoding にシリアル化して逆シリアル化する API と、スキーマ ID を持つヘッダーを提供します。 SchemaRegistryClient を使用して、スキーマコンテンツからスキーマ ID を取得するか、またはその逆を行います。

メッセージの形式

同じ形式が、Azure SDK 言語全体のスキーマ レジストリ シリアライザーで使用されます。

メッセージは次のようにエンコードされます。

  • 4 バイト: 書式インジケーター

    • 現在、以下の形式を示す場合は常に 0 です。
  • 32 バイト: スキーマ ID

    • GUID の UTF-8 16 進数表現。
    • 32 桁の 16 進数、ハイフンなし。
    • スキーマ レジストリ サービスの文字列と同じ形式とバイト順。
  • 残りのバイト数: Avro ペイロード (一般に、形式固有のペイロード)

    • Avro バイナリ エンコード
    • NOT Avro Object Container File。スキーマを含み、このシリアル化の目的を打ち負かして、メッセージ ペイロードからスキーマをスキーマ レジストリに移動します。

次のセクションでは、次のような最も一般的なスキーマ レジストリ タスクの一部をカバーするいくつかのコード スニペットを示します。

シリアル化

メソッドを使用して AvroSerializer.serialize 、指定された avro スキーマを使用して dict データをシリアル化します。 メソッドは、以前にスキーマ レジストリ サービスに登録されたスキーマを使用し、将来のシリアル化の使用のためにスキーマをキャッシュに保持します。 また、 キーワード引数 auto_register_schemas=Trueを使用して をインスタンス化AvroSerializerすることで、サービスにスキーマを事前に登録したり、 メソッドに自動的に登録serializeしたりしないようにすることもできます。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

逆シリアル化

メソッドを使用して AvroSerializer.deserialize 、生バイトを dict データに逆シリアル化します。 メソッドは、スキーマ レジストリ サービスからスキーマを自動的に取得し、将来の逆シリアル化の使用のためにスキーマをキャッシュに保持します。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

統合を送信する Event Hubs

Event Hubs との統合により、シリアル化された avro dict データを EventData の本文として送信します。

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

統合を受け取る Event Hubs

Event Hubs と統合して、未加工バイトを受信EventDataして avro dict データに逆シリアル化します。

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

トラブルシューティング

全般

Azure Schema Registry Avro Serializer では、 Azure Core で定義されている例外が発生します。

ログの記録

このライブラリでは、ログ記録に標準 のログ ライブラリが使用されます。 HTTP セッションに関する基本情報 (URL、ヘッダーなど) は INFO レベルでログに記録されます。

要求/応答本文、未変換ヘッダーなど、詳細な DEBUG レベルのログは、 引数を使用してクライアントで logging_enable 有効にすることができます。

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

同様に、logging_enable は、詳細なログ記録がクライアントで有効になっていない場合でも、1 回の操作のために有効にすることができます。

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

次のステップ

その他のサンプル コード

サンプル ディレクトリで 、Azure Schema Registry Avro Serializer の一般的なシナリオを示すその他の例を参照してください。

共同作成

このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。

pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。

このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。