Pustaka klien Azure Schema Registry Avro Serializer untuk Python - versi 1.0.0b4
Azure Schema Registry adalah layanan repositori skema yang dihosting oleh Azure Event Hubs, menyediakan penyimpanan skema, penerapan versi, dan manajemen. Paket ini menyediakan serializer Avro yang mampu menserialisasikan dan mendeserialisasi payload yang berisi pengidentifikasi skema Schema Registry dan data yang dikodekan Avro.
Kode sumber | Paket (PyPi) | Dokumentasi | referensi API Sampel | Changelog
Pengelakan
Dukungan paket Azure SDK Python untuk Python 2.7 berakhir pada 01 Januari 2022. Untuk informasi dan pertanyaan lebih lanjut, silakan merujuk ke https://github.com/Azure/azure-sdk-for-python/issues/20691
Memulai
Menginstal paket
Instal pustaka klien Azure Schema Registry Avro Serializer dan pustaka klien Azure Identity untuk Python dengan pip:
pip install azure-schemaregistry-avroserializer azure-identity
Prasyarat:
Untuk menggunakan paket ini, Anda harus memiliki:
- Langganan Azure - Membuat akun gratis
- Azure Schema Registry
- Python 2.7, 3.6 atau yang lebih baru - Instal Python
Mengautentikasi klien
Interaksi dengan Schema Registry Avro Serializer dimulai dengan instans kelas AvroSerializer, yang mengambil nama grup skema dan kelas Klien Schema Registry . Konstruktor klien mengambil namespace layanan yang sepenuhnya memenuhi syarat Azure Event Hubs dan kredensial Azure Active Directory:
Namespace yang sepenuhnya memenuhi syarat dari instans Schema Registry harus mengikuti format:
<yournamespace>.servicebus.windows.net
.Kredensial AAD yang mengimplementasikan protokol TokenCredential harus diteruskan ke konstruktor. Ada implementasi protokol yang
TokenCredential
tersedia dalam paket azure-identity. Untuk menggunakan jenis kredensial yang disediakan olehazure-identity
, instal pustaka klien Azure Identity untuk Python dengan pip:
pip install azure-identity
- Selain itu, untuk menggunakan API asinkron yang didukung pada Python 3.6+, Anda harus terlebih dahulu menginstal transportasi asinkron, seperti aiohttp:
pip install aiohttp
Buat AvroSerializer menggunakan pustaka azure-schemaregistry:
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
Konsep utama
AvroSerializer
Menyediakan API untuk menserialisasikan dan mendeserialisasi dari Pengodean Biner Avro ditambah header dengan ID skema. Menggunakan SchemaRegistryClient untuk mendapatkan ID skema dari konten skema atau sebaliknya.
Format pesan
Format yang sama digunakan oleh serializer registri skema di seluruh bahasa Azure SDK.
Pesan dikodekan sebagai berikut:
4 byte: Format Indikator
- Saat ini selalu nol untuk menunjukkan format di bawah ini.
32 byte: ID Skema
- Representasi heksadesimal UTF-8 dari GUID.
- 32 digit hex, tanpa tanda hubung.
- Format dan urutan byte yang sama dengan string dari layanan Schema Registry.
Byte yang tersisa: Payload Avro (secara umum, payload khusus format)
- Pengodean Biner Avro
- BUKAN File Kontainer Objek Avro, yang mencakup skema dan mengalahkan tujuan serialzer ini untuk memindahkan skema keluar dari payload pesan dan ke registri skema.
Contoh
Bagian berikut ini menyediakan beberapa cuplikan kode yang mencakup beberapa tugas Schema Registry yang paling umum, termasuk:
- Serialisasi
- Deserialisasi
- Integrasi Pengiriman Azure Event Hubs
- Integrasi Penerimaan Azure Event Hubs
Serialisasi
Gunakan AvroSerializer.serialize
metode untuk menserialisasikan data dict dengan skema avro yang diberikan.
Metode ini akan menggunakan skema yang sebelumnya terdaftar ke layanan Schema Registry dan menjaga skema tetap di-cache untuk penggunaan serialisasi di masa mendatang. Dimungkinkan juga untuk menghindari pra-pendaftaran skema ke layanan dan secara otomatis mendaftar dengan metode dengan serialize
membuat AvroSerializer
instans dengan argumen auto_register_schemas=True
kata kunci .
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema=definition)
Deserialisasi
Gunakan AvroSerializer.deserialize
metode untuk mendeserialisasi byte mentah ke dalam data dict.
Metode ini secara otomatis mengambil skema dari Layanan Registri Skema dan menjaga skema tetap di-cache untuk penggunaan deserialisasi di masa mendatang.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
Integrasi Pengiriman Azure Event Hubs
Integrasi dengan Azure Event Hubs untuk mengirim data dict avro berseri sebagai isi EventData.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
Integrasi Penerimaan Azure Event Hubs
Integrasi dengan Azure Event Hubs untuk menerima EventData
dan mendeserialisasi byte mentah ke dalam data dict avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Pemecahan Masalah
Umum
Azure Schema Registry Avro Serializer meningkatkan pengecualian yang ditentukan dalam Azure Core.
Pembuatan Log
Pustaka ini menggunakan pustaka pengelogan standar untuk pengelogan. Informasi dasar tentang sesi HTTP (URL, header, dll.) dicatat di tingkat INFO.
Pengelogan tingkat DEBUG terperinci, termasuk isi permintaan/respons dan header yang tidak diredaktifkan, dapat diaktifkan pada klien dengan logging_enable
argumen :
import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")
Demikian pula, logging_enable
dapat mengaktifkan pengelogan mendetail untuk satu operasi, meskipun tidak diaktifkan untuk klien:
serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)
Langkah berikutnya
Lebih banyak kode sampel
Silakan temukan contoh lebih lanjut dalam direktori sampel yang menunjukkan skenario Umum Azure Schema Registry Avro Serializer.
Berkontribusi
Proyek ini menyambut baik kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda menyetujui Perjanjian Lisensi Kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar-benar melakukannya, memberi kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi https://cla.microsoft.com.
Ketika Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghias PR dengan tepat (misalnya, label, komentar). Cukup ikuti instruksi yang diberikan oleh bot. Anda hanya perlu melakukan ini sekali di semua repos menggunakan CLA kami.
Proyek ini telah mengadopsi Kode Etik Sumber Terbuka Microsoft. Untuk informasi selengkapnya, lihat Tanya Jawab Umum Tata Tertib atau hubungi opencode@microsoft.com untuk pertanyaan atau komentar lainnya.
Azure SDK for Python
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk