Python için Azure Schema Registry Avro Kodlayıcısı istemci kitaplığı - sürüm 1.0.0
Azure Schema Registry, şema depolama, sürüm oluşturma ve yönetim sağlayan Azure Event Hubs tarafından barındırılan bir şema deposu hizmetidir. Bu paket, Şema Kayıt Defteri şema tanımlayıcılarını ve Avro ile kodlanmış içeriği içeren yükleri kodlama ve kod çözme özelliğine sahip bir Avro kodlayıcı sağlar.
Kaynak kodu | Paket (PyPi) | API başvuru belgeleri | Örnekleri | Changelog
Bildirim
Python 2.7 için Azure SDK Python paketleri desteği 01 Ocak 2022'de sona erdi. Daha fazla bilgi ve soru için lütfen https://github.com/Azure/azure-sdk-for-python/issues/20691
Başlarken
Paketi yükleme
Pip ile Python için Azure Schema Registry Avro Kodlayıcısı istemci kitaplığını yükleyin:
pip install azure-schemaregistry-avroencoder
Ön koşullar:
Bu paketi kullanmak için aşağıdakilere sahip olmanız gerekir:
- Azure aboneliği - Ücretsiz hesap oluşturma
- Azure Şema Kayıt Defteri - Azure portal kullanarak Şema Kayıt Defteri grubu oluşturmaya yönelik hızlı başlangıç kılavuzu aşağıda verilmiştir.
- Python 3.6 veya üzeri - Python'ı yükleme
İstemcinin kimliğini doğrulama
Şema Kayıt Defteri Avro Kodlayıcısı ile etkileşim, şema grubu adını ve Şema Kayıt Defteri İstemci sınıfını alan bir AvroEncoder sınıfı örneğiyle başlar. İstemci oluşturucu, Event Hubs tam ad alanını ve Azure Active Directory kimlik bilgilerini alır:
Schema Registry örneğinin tam ad alanı şu biçimde olmalıdır:
<yournamespace>.servicebus.windows.net
.TokenCredential protokolunu uygulayan bir AAD kimlik bilgisi oluşturucuya geçirilmelidir. Azure-identity paketinde
TokenCredential
kullanılabilen protokol uygulamaları vardır. tarafındanazure-identity
sağlanan kimlik bilgisi türlerini kullanmak için lütfen pip ile Python için Azure Identity istemci kitaplığını yükleyin:
pip install azure-identity
- Ayrıca, zaman uyumsuz API'yi kullanmak için önce aiohttp:
pip install aiohttp
azure-schemaregistry kitaplığını kullanarak AvroEncoder oluşturun:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Önemli kavramlar
AvroEncoder
Avro İkili Kodlaması ile şema kimliğine sahip bir içerik türüne kodlamak ve bu kodlamayı çözmek için API sağlar. Şema içeriğinden şema kimliklerini almak için SchemaRegistryClient kullanır (veya tam tersi).
Desteklenen ileti modelleri
ile AvroEncoder
birlikte çalışabilirlik için belirli Azure Mesajlaşma SDK'sı model sınıflarına destek eklendi. Bu modeller, ad alanı altında tanımlanan protokolün alt türleridir MessageType
azure.schemaregistry.encoder.avroencoder
. Şu anda desteklenen model sınıfları şunlardır:
azure.eventhub.EventData
Içinazure-eventhub>=5.9.0
İleti biçimi
Kodlama için kodlayıcıya MessageType protokolünden sonra gelen bir ileti türü sağlanırsa, karşılık gelen içerik ve içerik türü özelliklerini ayarlar; burada:
content
: Avro yükü (genel olarak biçime özgü yük)- Avro İkili Kodlaması
- NOT Şemayı içeren ve şemayı ileti yükünün dışına ve şema kayıt defterine taşımak için bu kodlayıcının amacını alt eden Avro Nesne Kapsayıcı Dosyası.
content type
: biçimindeavro/binary+<schema ID>
bir dize; burada:avro/binary
biçim göstergesidir<schema ID>
GUID'nin onaltılık gösterimidir, Şema Kayıt Defteri hizmetindeki dizeyle aynı biçim ve bayt sırasıdır.
İleti türü olarak geçirilirse EventData
nesnesinde EventData
aşağıdaki özellikler ayarlanır:
body
özelliği içerik değerine ayarlanır.content_type
özelliği içerik türü değerine ayarlanır.
İleti türü sağlanmazsa ve varsayılan olarak kodlayıcı aşağıdaki dikteyi oluşturur: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Örnekler
Aşağıdaki bölümlerde, aşağıdakiler de dahil olmak üzere en yaygın Şema Kayıt Defteri görevlerinden bazılarını kapsayan çeşitli kod parçacıkları sağlanır:
Encoding
AvroEncoder.encode
verilen Avro şemasıyla içeriği kodlamak için yöntemini kullanın.
yöntemi daha önce Schema Registry hizmetine kaydedilmiş bir şema kullanır ve gelecekte kodlama kullanımı için şemayı önbelleğe alır. Şemanın hizmete önceden kaydedilmesini ve yöntemiyle encode
otomatik olarak kaydedilmesini önlemek için anahtar sözcük bağımsız değişkeni auto_register=True
oluşturucuya AvroEncoder
geçirilmelidir.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Kod çözme
AvroEncoder.decode
Avro ile kodlanmış içeriğin kodunu çözmek için aşağıdakilerden birini kullanarak yöntemini kullanın:
- MessageType protokolünün alt türü olan bir ileti nesnesi geçirme.
- Anahtarlar
content
(tür bayt) vecontent_type
(tür dizesi) ile bir dikte geçirme. yöntemi şemayı Şema Kayıt Defteri Hizmeti'nden otomatik olarak alır ve gelecekte kod çözme kullanımı için şemayı önbelleğe alır.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Event Hubs Gönderme Tümleştirmesi
Avro ile kodlanmış içeriğe ve karşılık gelen content_type
öğesine ayarlanmış bir EventData
nesne göndermek için Event Hubs ile body
tümleştirme.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Event Hubs Alma Tümleştirmesi
Bir EventData
nesne almak ve Avro ile kodlanmış body
değerin kodunu çözmek için Event Hubs ile tümleştirme.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Sorun giderme
Genel
Azure Schema Registry Avro Kodlayıcısı, Schema Registry hizmetiyle iletişim kurarken hatalarla karşılaşılırsa Azure Core'da tanımlanan özel durumları tetikler. Geçersiz içerik/içerik türleri ve geçersiz şemalarla ilgili hatalar sırasıyla ve azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
olarak azure.schemaregistry.encoder.avroencoder.InvalidContentError
oluşturulur ve burada __cause__
Apache Avro kitaplığı tarafından tetiklenen temel özel durum yer alır.
Günlüğe Kaydetme
Bu kitaplık, günlüğe kaydetme için standart günlük kitaplığını kullanır. HTTP oturumlarıyla ilgili temel bilgiler (URL'ler, üst bilgiler vb.) BİlGİ düzeyinde günlüğe kaydedilir.
İstek/yanıt gövdeleri ve kaydedilmemiş üst bilgiler de dahil olmak üzere ayrıntılı HATA AYıKLAMA düzeyi günlüğe kaydetme, bir istemcide şu bağımsız değişkenle logging_enable
etkinleştirilebilir:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Benzer şekilde, logging_enable
istemci için etkinleştirilmemiş olsa bile tek bir işlem için ayrıntılı günlüğe kaydetmeyi etkinleştirebilir:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Sonraki adımlar
Daha fazla örnek kod
Yaygın Azure Şema Kayıt Defteri Avro Kodlayıcısı senaryolarını gösteren diğer örnekler samples dizinindedir.
Katkıda bulunma
Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.
Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.
Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorular veya yorumlarla iletişime geçin opencode@microsoft.com .
Azure SDK for Python
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin