Bagikan melalui


Pustaka klien Azure Schema Registry Avro Serializer untuk Python - versi 1.0.0b4

Azure Schema Registry adalah layanan repositori skema yang dihosting oleh Azure Event Hubs, menyediakan penyimpanan skema, penerapan versi, dan manajemen. Paket ini menyediakan serializer Avro yang mampu menserialisasikan dan mendeserialisasi payload yang berisi pengidentifikasi skema Schema Registry dan data yang dikodekan Avro.

Kode sumber | Paket (PyPi) | Dokumentasi | referensi API Sampel | Changelog

Pengelakan

Dukungan paket Azure SDK Python untuk Python 2.7 berakhir pada 01 Januari 2022. Untuk informasi dan pertanyaan lebih lanjut, silakan merujuk ke https://github.com/Azure/azure-sdk-for-python/issues/20691

Memulai

Menginstal paket

Instal pustaka klien Azure Schema Registry Avro Serializer dan pustaka klien Azure Identity untuk Python dengan pip:

pip install azure-schemaregistry-avroserializer azure-identity

Prasyarat:

Untuk menggunakan paket ini, Anda harus memiliki:

Mengautentikasi klien

Interaksi dengan Schema Registry Avro Serializer dimulai dengan instans kelas AvroSerializer, yang mengambil nama grup skema dan kelas Klien Schema Registry . Konstruktor klien mengambil namespace layanan yang sepenuhnya memenuhi syarat Azure Event Hubs dan kredensial Azure Active Directory:

  • Namespace yang sepenuhnya memenuhi syarat dari instans Schema Registry harus mengikuti format: <yournamespace>.servicebus.windows.net.

  • Kredensial AAD yang mengimplementasikan protokol TokenCredential harus diteruskan ke konstruktor. Ada implementasi protokol yang TokenCredential tersedia dalam paket azure-identity. Untuk menggunakan jenis kredensial yang disediakan oleh azure-identity, instal pustaka klien Azure Identity untuk Python dengan pip:

pip install azure-identity
  • Selain itu, untuk menggunakan API asinkron yang didukung pada Python 3.6+, Anda harus terlebih dahulu menginstal transportasi asinkron, seperti aiohttp:
pip install aiohttp

Buat AvroSerializer menggunakan pustaka azure-schemaregistry:

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Konsep utama

AvroSerializer

Menyediakan API untuk menserialisasikan dan mendeserialisasi dari Pengodean Biner Avro ditambah header dengan ID skema. Menggunakan SchemaRegistryClient untuk mendapatkan ID skema dari konten skema atau sebaliknya.

Format pesan

Format yang sama digunakan oleh serializer registri skema di seluruh bahasa Azure SDK.

Pesan dikodekan sebagai berikut:

  • 4 byte: Format Indikator

    • Saat ini selalu nol untuk menunjukkan format di bawah ini.
  • 32 byte: ID Skema

    • Representasi heksadesimal UTF-8 dari GUID.
    • 32 digit hex, tanpa tanda hubung.
    • Format dan urutan byte yang sama dengan string dari layanan Schema Registry.
  • Byte yang tersisa: Payload Avro (secara umum, payload khusus format)

    • Pengodean Biner Avro
    • BUKAN File Kontainer Objek Avro, yang mencakup skema dan mengalahkan tujuan serialzer ini untuk memindahkan skema keluar dari payload pesan dan ke registri skema.

Contoh

Bagian berikut ini menyediakan beberapa cuplikan kode yang mencakup beberapa tugas Schema Registry yang paling umum, termasuk:

Serialisasi

Gunakan AvroSerializer.serialize metode untuk menserialisasikan data dict dengan skema avro yang diberikan. Metode ini akan menggunakan skema yang sebelumnya terdaftar ke layanan Schema Registry dan menjaga skema tetap di-cache untuk penggunaan serialisasi di masa mendatang. Dimungkinkan juga untuk menghindari pra-pendaftaran skema ke layanan dan secara otomatis mendaftar dengan metode dengan serialize membuat AvroSerializer instans dengan argumen auto_register_schemas=Truekata kunci .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Deserialisasi

Gunakan AvroSerializer.deserialize metode untuk mendeserialisasi byte mentah ke dalam data dict. Metode ini secara otomatis mengambil skema dari Layanan Registri Skema dan menjaga skema tetap di-cache untuk penggunaan deserialisasi di masa mendatang.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Integrasi Pengiriman Azure Event Hubs

Integrasi dengan Azure Event Hubs untuk mengirim data dict avro berseri sebagai isi EventData.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Integrasi Penerimaan Azure Event Hubs

Integrasi dengan Azure Event Hubs untuk menerima EventData dan mendeserialisasi byte mentah ke dalam data dict avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Pemecahan Masalah

Umum

Azure Schema Registry Avro Serializer meningkatkan pengecualian yang ditentukan dalam Azure Core.

Pembuatan Log

Pustaka ini menggunakan pustaka pengelogan standar untuk pengelogan. Informasi dasar tentang sesi HTTP (URL, header, dll.) dicatat di tingkat INFO.

Pengelogan tingkat DEBUG terperinci, termasuk isi permintaan/respons dan header yang tidak diredaktifkan, dapat diaktifkan pada klien dengan logging_enable argumen :

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

Demikian pula, logging_enable dapat mengaktifkan pengelogan mendetail untuk satu operasi, meskipun tidak diaktifkan untuk klien:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Langkah berikutnya

Lebih banyak kode sampel

Silakan temukan contoh lebih lanjut dalam direktori sampel yang menunjukkan skenario Umum Azure Schema Registry Avro Serializer.

Berkontribusi

Proyek ini menyambut baik kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda menyetujui Perjanjian Lisensi Kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar-benar melakukannya, memberi kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi https://cla.microsoft.com.

Ketika Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghias PR dengan tepat (misalnya, label, komentar). Cukup ikuti instruksi yang diberikan oleh bot. Anda hanya perlu melakukan ini sekali di semua repos menggunakan CLA kami.

Proyek ini telah mengadopsi Kode Etik Sumber Terbuka Microsoft. Untuk informasi selengkapnya, lihat Tanya Jawab Umum Tata Tertib atau hubungi opencode@microsoft.com untuk pertanyaan atau komentar lainnya.