Pustaka klien Azure Schema Registry Avro Encoder untuk Python - versi 1.0.0
Azure Schema Registry adalah layanan repositori skema yang dihosting oleh Azure Event Hubs, menyediakan penyimpanan, penerapan versi, dan manajemen skema. Paket ini menyediakan encoder Avro yang mampu mengodekan dan mendekode payload yang berisi pengidentifikasi skema Schema Registry dan konten yang dikodekan Avro.
Kode sumber | Paket (PyPi) | Dokumentasi | referensi API Sampel | Changelog
Pengelakan
Dukungan paket Azure SDK Python untuk Python 2.7 telah berakhir 01 Januari 2022. Untuk informasi lebih lanjut dan pertanyaan, silakan merujuk ke https://github.com/Azure/azure-sdk-for-python/issues/20691
Memulai
Menginstal paket
Instal pustaka klien Azure Schema Registry Avro Encoder untuk Python dengan pip:
pip install azure-schemaregistry-avroencoder
Prasyarat:
Untuk menggunakan paket ini, Anda harus memiliki:
- Langganan Azure - Membuat akun gratis
- Azure Schema Registry - Berikut adalah panduan mulai cepat untuk membuat grup Schema Registry menggunakan portal Azure.
- Python 3.6 atau yang lebih baru - Instal Python
Mengautentikasi klien
Interaksi dengan Schema Registry Avro Encoder dimulai dengan instans kelas AvroEncoder, yang mengambil nama grup skema dan kelas Klien Schema Registry . Konstruktor klien mengambil namespace layanan Azure Event Hubs yang sepenuhnya memenuhi syarat dan kredensial Azure Active Directory:
Namespace yang sepenuhnya memenuhi syarat dari instans Schema Registry harus mengikuti format:
<yournamespace>.servicebus.windows.net
.Kredensial AAD yang mengimplementasikan protokol TokenCredential harus diteruskan ke konstruktor. Ada implementasi protokol yang
TokenCredential
tersedia dalam paket identitas azure. Untuk menggunakan jenis kredensial yang disediakan olehazure-identity
, instal pustaka klien Azure Identity untuk Python dengan pip:
pip install azure-identity
- Selain itu, untuk menggunakan API asinkron, Anda harus terlebih dahulu menginstal transportasi asinkron, seperti aiohttp:
pip install aiohttp
Buat AvroEncoder menggunakan pustaka azure-schemaregistry:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Konsep utama
AvroEncoder
Menyediakan API untuk mengodekan dan mendekode dari Pengodean Biner Avro ditambah jenis konten dengan ID skema. Menggunakan SchemaRegistryClient untuk mendapatkan ID skema dari konten skema atau sebaliknya.
Model pesan yang didukung
Dukungan telah ditambahkan ke kelas model Azure Messaging SDK tertentu untuk interoperabilitas dengan AvroEncoder
. Model-model ini adalah subjenis protokol yang MessageType
ditentukan di azure.schemaregistry.encoder.avroencoder
bawah namespace layanan. Saat ini, kelas model yang didukung adalah:
azure.eventhub.EventData
untukazure-eventhub>=5.9.0
Format pesan
Jika jenis pesan yang mengikuti protokol MessageType disediakan ke encoder untuk pengodean, itu akan mengatur konten terkait dan properti tipe konten, di mana:
content
: Payload Avro (secara umum, payload khusus format)- Pengodean Biner Avro
- BUKAN File Kontainer Objek Avro, yang mencakup skema dan mengalahkan tujuan encoder ini untuk memindahkan skema keluar dari payload pesan dan ke registri skema.
content type
: string formatavro/binary+<schema ID>
, di mana:avro/binary
adalah indikator format<schema ID>
adalah representasi heksadesimal GUID, format dan urutan byte yang sama dengan string dari layanan Schema Registry.
Jika EventData
diteruskan sebagai jenis pesan, properti berikut akan diatur pada EventData
objek :
- Properti
body
akan diatur ke nilai konten. - Properti
content_type
akan diatur ke nilai tipe isi.
Jika jenis pesan tidak disediakan, dan secara default, encoder akan membuat dict berikut: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Contoh
Bagian berikut ini menyediakan beberapa cuplikan kode yang mencakup beberapa tugas Schema Registry yang paling umum, termasuk:
Pengodean
AvroEncoder.encode
Gunakan metode untuk mengodekan konten dengan skema Avro yang diberikan.
Metode ini akan menggunakan skema yang sebelumnya terdaftar ke layanan Schema Registry dan menjaga skema tetap di-cache untuk penggunaan pengodean di masa mendatang. Untuk menghindari pra-pendaftaran skema ke layanan dan secara otomatis mendaftarkannya dengan encode
metode , argumen auto_register=True
kata kunci harus diteruskan ke AvroEncoder
konstruktor.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Decoding
AvroEncoder.decode
Gunakan metode untuk mendekode konten yang dikodekan Avro dengan:
- Meneruskan objek pesan yang merupakan subjenis protokol MessageType.
- Meneruskan dict dengan kunci
content
(ketik byte) dancontent_type
(jenis string). Metode ini secara otomatis mengambil skema dari Layanan Registri Skema dan menjaga skema tetap di-cache untuk penggunaan pendekodean di masa mendatang.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Integrasi Pengiriman Azure Event Hubs
Integrasi dengan Azure Event Hubs untuk mengirim EventData
objek dengan body
diatur ke konten yang dikodekan Avro dan yang content_type
sesuai.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Integrasi Penerimaan Azure Event Hubs
Integrasi dengan Azure Event Hubs untuk menerima EventData
objek dan mendekode nilai yang dikodekan body
Avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Pemecahan Masalah
Umum
Azure Schema Registry Avro Encoder memunculkan pengecualian yang ditentukan di Azure Core jika terjadi kesalahan saat berkomunikasi dengan layanan Schema Registry. Kesalahan yang terkait dengan jenis konten/konten yang tidak valid dan skema yang tidak valid akan dimunculkan sebagai azure.schemaregistry.encoder.avroencoder.InvalidContentError
dan azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
, masing-masing, di mana __cause__
akan berisi pengecualian yang mendasar yang dimunculkan oleh pustaka Apache Avro.
Pembuatan Log
Pustaka ini menggunakan pustaka pengelogan standar untuk pengelogan. Informasi dasar tentang sesi HTTP (URL, header, dll.) dicatat di tingkat INFO.
Pengelogan tingkat DEBUG terperinci, termasuk isi permintaan/respons dan header yang tidak diredaksikan, dapat diaktifkan pada klien dengan logging_enable
argumen :
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Demikian pula, logging_enable
dapat mengaktifkan pengelogan mendetail untuk satu operasi, meskipun tidak diaktifkan untuk klien:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Langkah berikutnya
Lebih banyak kode sampel
Contoh lebih lanjut yang menunjukkan skenario Umum Azure Schema Registry Avro Encoder ada di direktori sampel .
Berkontribusi
Proyek ini menyambut baik kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda menyetujui Perjanjian Lisensi Kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar-benar melakukannya, memberi kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi https://cla.microsoft.com.
Ketika Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghias PR dengan tepat (misalnya, label, komentar). Cukup ikuti instruksi yang diberikan oleh bot. Anda hanya perlu melakukan ini sekali di semua repos menggunakan CLA kami.
Proyek ini telah mengadopsi Kode Etik Sumber Terbuka Microsoft. Untuk informasi selengkapnya, lihat Tanya Jawab Umum Tata Tertib atau hubungi opencode@microsoft.com untuk pertanyaan atau komentar lainnya.
Azure SDK for Python
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk