Python 用 Azure Schema Registry Avro Encoder クライアント ライブラリ - バージョン 1.0.0

Azure Schema Registry は、スキーマ ストレージ、バージョン管理、管理を提供する、Azure Event Hubsによってホストされるスキーマ リポジトリ サービスです。 このパッケージは、スキーマ レジストリ スキーマ識別子と Avro でエンコードされたコンテンツを含むペイロードをエンコードおよびデコードできる Avro エンコーダーを提供します。

ソースコード | パッケージ (PyPi) | API リファレンス ドキュメント | サンプル | Changelog

免責事項

Python 2.7 の Azure SDK Python パッケージのサポートは、2022 年 1 月 1 日に終了しました。 詳細と質問については、https://github.com/Azure/azure-sdk-for-python/issues/20691 を参照してください

作業の開始

パッケージをインストールする

pip を使用して Python 用 Azure Schema Registry Avro Encoder クライアント ライブラリをインストールします。

pip install azure-schemaregistry-avroencoder

前提条件:

このパッケージを使用するには、次が必要です。

クライアントを認証する

スキーマ レジストリ Avro Encoder との対話は、まず AvroEncoder クラスのインスタンスから始まり、スキーマ グループ名と スキーマ レジストリ クライアント クラスを受け取ります。 クライアント コンストラクターは、Event Hubs の完全修飾名前空間と Azure Active Directory 資格情報を受け取ります。

  • スキーマ レジストリ インスタンスの完全修飾名前空間は、 の形式 <yournamespace>.servicebus.windows.netに従う必要があります。

  • TokenCredential プロトコルを実装する AAD 資格情報をコンストラクターに渡す必要があります。 azure-identity パッケージで使用できるプロトコルの実装TokenCredentialがあります。 で azure-identity提供される資格情報の種類を使用するには、 pip を使用して Python 用の Azure Identity クライアント ライブラリをインストールしてください。

pip install azure-identity
  • さらに、非同期 API を使用するには、まず、 aiohttp などの非同期トランスポートをインストールする必要があります。
pip install aiohttp

azure-schemaregistry ライブラリを使用して AvroEncoder を作成します。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

主要な概念

AvroEncoder

Avro Binary Encoding にエンコードしてデコードする API と、スキーマ ID を持つコンテンツ タイプを提供します。 SchemaRegistryClient を使用して、スキーマコンテンツからスキーマ ID を取得するか、またはその逆を行います。

サポートされているメッセージ モデル

との相互運用性のために、特定の Azure Messaging SDK モデル クラスにサポートが AvroEncoder追加されました。 これらのモデルは、 名前空間で定義されている MessageType プロトコルの azure.schemaregistry.encoder.avroencoder サブタイプです。 現在、サポートされているモデル クラスは次のとおりです。

  • azure-eventhub>=5.9.0azure.eventhub.EventData

メッセージの形式

MessageType プロトコルに従うメッセージの種類をエンコード用にエンコーダーに提供すると、対応するコンテンツとコンテンツ タイプのプロパティが設定されます。ここで、次のようになります。

  • content: Avro ペイロード (一般に、形式固有のペイロード)

    • Avro バイナリ エンコード
    • NOT Avro Object Container File。スキーマが含まれており、メッセージ ペイロードからスキーマをスキーマ レジストリに移動するために、このエンコーダーの目的を無効にします。
  • content type: という形式 avro/binary+<schema ID>の文字列。ここで、

    • avro/binary は書式インジケーターです
    • <schema ID> は、GUID の 16 進数表現で、スキーマ レジストリ サービスの文字列と同じ形式とバイト順です。

がメッセージの種類として渡された場合 EventData 、オブジェクトに次のプロパティが EventData 設定されます。

  • プロパティは body コンテンツ値に設定されます。
  • プロパティは content_type コンテンツ タイプの値に設定されます。

メッセージの種類が指定されていない場合、エンコーダーは既定で次の dict を作成します。 {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

次のセクションでは、次のような最も一般的なスキーマ レジストリ タスクの一部をカバーするいくつかのコード スニペットを示します。

エンコード

メソッドを AvroEncoder.encode 使用して、指定された Avro スキーマでコンテンツをエンコードします。 メソッドは、以前にスキーマ レジストリ サービスに登録されたスキーマを使用し、後でエンコードを使用するためにスキーマをキャッシュに保持します。 スキーマをサービスに事前登録し、 メソッドに自動的に登録 encode しないようにするには、キーワード引数 auto_register=True をコンストラクターに AvroEncoder 渡す必要があります。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

デコード

メソッドを AvroEncoder.decode 使用して、Avro でエンコードされたコンテンツを次のいずれかの方法でデコードします。

  • MessageType プロトコルのサブタイプであるメッセージ オブジェクトを渡します。
  • キー content(型バイト) と (型文字列) を使用して content_type dict を渡します。 メソッドは、スキーマ レジストリ サービスからスキーマを自動的に取得し、将来のデコードの使用のためにスキーマをキャッシュに保持します。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

統合を送信する Event Hubs

Event Hubs と統合して、EventDataAvro でエンコードされたコンテンツとbody対応する に設定された オブジェクトを送信しますcontent_type

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

統合を受け取る Event Hubs

Event Hubs と統合してオブジェクトをEventData受信し、Avro でエンコードされたbody値をデコードします。

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

トラブルシューティング

全般

Azure Schema Registry Avro Encoder は、スキーマ レジストリ サービスとの通信時にエラーが発生した場合に 、Azure Core で定義されている例外を発生させます。 無効なコンテンツ/コンテンツ タイプと無効なスキーマに関連するエラーはそれぞれ と azure.schemaregistry.encoder.avroencoder.InvalidSchemaErrorとしてazure.schemaregistry.encoder.avroencoder.InvalidContentError発生します。このエラー__cause__には、Apache Avro ライブラリによって発生した基になる例外が含まれます。

ログの記録

このライブラリでは、ログ記録に標準 のログ ライブラリが使用されます。 HTTP セッションに関する基本情報 (URL、ヘッダーなど) は INFO レベルでログに記録されます。

要求/応答本文、未変換ヘッダーなど、詳細な DEBUG レベルのログは、 引数を使用してクライアントで logging_enable 有効にすることができます。

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

同様に、logging_enable は、詳細なログ記録がクライアントで有効になっていない場合でも、1 回の操作のために有効にすることができます。

encoder.encode(dict_content, schema=definition, logging_enable=True)

次のステップ

その他のサンプル コード

一般的な Azure Schema Registry Avro Encoder シナリオを示すその他の例については、 サンプル ディレクトリを参照してください。

共同作成

このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。

pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。

このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。