Pustaka klien Azure Schema Registry Avro Encoder untuk Python - versi 1.0.0

Azure Schema Registry adalah layanan repositori skema yang dihosting oleh Azure Event Hubs, menyediakan penyimpanan, penerapan versi, dan manajemen skema. Paket ini menyediakan encoder Avro yang mampu mengodekan dan mendekode payload yang berisi pengidentifikasi skema Schema Registry dan konten yang dikodekan Avro.

Kode sumber | Paket (PyPi) | Dokumentasi | referensi API Sampel | Changelog

Pengelakan

Dukungan paket Azure SDK Python untuk Python 2.7 telah berakhir 01 Januari 2022. Untuk informasi lebih lanjut dan pertanyaan, silakan merujuk ke https://github.com/Azure/azure-sdk-for-python/issues/20691

Memulai

Menginstal paket

Instal pustaka klien Azure Schema Registry Avro Encoder untuk Python dengan pip:

pip install azure-schemaregistry-avroencoder

Prasyarat:

Untuk menggunakan paket ini, Anda harus memiliki:

Mengautentikasi klien

Interaksi dengan Schema Registry Avro Encoder dimulai dengan instans kelas AvroEncoder, yang mengambil nama grup skema dan kelas Klien Schema Registry . Konstruktor klien mengambil namespace layanan Azure Event Hubs yang sepenuhnya memenuhi syarat dan kredensial Azure Active Directory:

  • Namespace yang sepenuhnya memenuhi syarat dari instans Schema Registry harus mengikuti format: <yournamespace>.servicebus.windows.net.

  • Kredensial AAD yang mengimplementasikan protokol TokenCredential harus diteruskan ke konstruktor. Ada implementasi protokol yang TokenCredential tersedia dalam paket identitas azure. Untuk menggunakan jenis kredensial yang disediakan oleh azure-identity, instal pustaka klien Azure Identity untuk Python dengan pip:

pip install azure-identity
  • Selain itu, untuk menggunakan API asinkron, Anda harus terlebih dahulu menginstal transportasi asinkron, seperti aiohttp:
pip install aiohttp

Buat AvroEncoder menggunakan pustaka azure-schemaregistry:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Konsep utama

AvroEncoder

Menyediakan API untuk mengodekan dan mendekode dari Pengodean Biner Avro ditambah jenis konten dengan ID skema. Menggunakan SchemaRegistryClient untuk mendapatkan ID skema dari konten skema atau sebaliknya.

Model pesan yang didukung

Dukungan telah ditambahkan ke kelas model Azure Messaging SDK tertentu untuk interoperabilitas dengan AvroEncoder. Model-model ini adalah subjenis protokol yang MessageType ditentukan di azure.schemaregistry.encoder.avroencoder bawah namespace layanan. Saat ini, kelas model yang didukung adalah:

  • azure.eventhub.EventData untuk azure-eventhub>=5.9.0

Format pesan

Jika jenis pesan yang mengikuti protokol MessageType disediakan ke encoder untuk pengodean, itu akan mengatur konten terkait dan properti tipe konten, di mana:

  • content: Payload Avro (secara umum, payload khusus format)

    • Pengodean Biner Avro
    • BUKAN File Kontainer Objek Avro, yang mencakup skema dan mengalahkan tujuan encoder ini untuk memindahkan skema keluar dari payload pesan dan ke registri skema.
  • content type: string format avro/binary+<schema ID>, di mana:

    • avro/binary adalah indikator format
    • <schema ID> adalah representasi heksadesimal GUID, format dan urutan byte yang sama dengan string dari layanan Schema Registry.

Jika EventData diteruskan sebagai jenis pesan, properti berikut akan diatur pada EventData objek :

  • Properti body akan diatur ke nilai konten.
  • Properti content_type akan diatur ke nilai tipe isi.

Jika jenis pesan tidak disediakan, dan secara default, encoder akan membuat dict berikut: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Contoh

Bagian berikut ini menyediakan beberapa cuplikan kode yang mencakup beberapa tugas Schema Registry yang paling umum, termasuk:

Pengodean

AvroEncoder.encode Gunakan metode untuk mengodekan konten dengan skema Avro yang diberikan. Metode ini akan menggunakan skema yang sebelumnya terdaftar ke layanan Schema Registry dan menjaga skema tetap di-cache untuk penggunaan pengodean di masa mendatang. Untuk menghindari pra-pendaftaran skema ke layanan dan secara otomatis mendaftarkannya dengan encode metode , argumen auto_register=True kata kunci harus diteruskan ke AvroEncoder konstruktor.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Decoding

AvroEncoder.decode Gunakan metode untuk mendekode konten yang dikodekan Avro dengan:

  • Meneruskan objek pesan yang merupakan subjenis protokol MessageType.
  • Meneruskan dict dengan kunci content(ketik byte) dan content_type (jenis string). Metode ini secara otomatis mengambil skema dari Layanan Registri Skema dan menjaga skema tetap di-cache untuk penggunaan pendekodean di masa mendatang.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Integrasi Pengiriman Azure Event Hubs

Integrasi dengan Azure Event Hubs untuk mengirim EventData objek dengan body diatur ke konten yang dikodekan Avro dan yang content_typesesuai.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Integrasi Penerimaan Azure Event Hubs

Integrasi dengan Azure Event Hubs untuk menerima EventData objek dan mendekode nilai yang dikodekan body Avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Pemecahan Masalah

Umum

Azure Schema Registry Avro Encoder memunculkan pengecualian yang ditentukan di Azure Core jika terjadi kesalahan saat berkomunikasi dengan layanan Schema Registry. Kesalahan yang terkait dengan jenis konten/konten yang tidak valid dan skema yang tidak valid akan dimunculkan sebagai azure.schemaregistry.encoder.avroencoder.InvalidContentError dan azure.schemaregistry.encoder.avroencoder.InvalidSchemaError, masing-masing, di mana __cause__ akan berisi pengecualian yang mendasar yang dimunculkan oleh pustaka Apache Avro.

Pembuatan Log

Pustaka ini menggunakan pustaka pengelogan standar untuk pengelogan. Informasi dasar tentang sesi HTTP (URL, header, dll.) dicatat di tingkat INFO.

Pengelogan tingkat DEBUG terperinci, termasuk isi permintaan/respons dan header yang tidak diredaksikan, dapat diaktifkan pada klien dengan logging_enable argumen :

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Demikian pula, logging_enable dapat mengaktifkan pengelogan mendetail untuk satu operasi, meskipun tidak diaktifkan untuk klien:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Langkah berikutnya

Lebih banyak kode sampel

Contoh lebih lanjut yang menunjukkan skenario Umum Azure Schema Registry Avro Encoder ada di direktori sampel .

Berkontribusi

Proyek ini menyambut baik kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda menyetujui Perjanjian Lisensi Kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar-benar melakukannya, memberi kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi https://cla.microsoft.com.

Ketika Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghias PR dengan tepat (misalnya, label, komentar). Cukup ikuti instruksi yang diberikan oleh bot. Anda hanya perlu melakukan ini sekali di semua repos menggunakan CLA kami.

Proyek ini telah mengadopsi Kode Etik Sumber Terbuka Microsoft. Untuk informasi selengkapnya, lihat Tanya Jawab Umum Tata Tertib atau hubungi opencode@microsoft.com untuk pertanyaan atau komentar lainnya.