Aracılığıyla paylaş


Python için Azure Schema Registry Avro Kodlayıcısı istemci kitaplığı - sürüm 1.0.0

Azure Schema Registry, şema depolama, sürüm oluşturma ve yönetim sağlayan Azure Event Hubs tarafından barındırılan bir şema deposu hizmetidir. Bu paket, Şema Kayıt Defteri şema tanımlayıcılarını ve Avro ile kodlanmış içeriği içeren yükleri kodlama ve kod çözme özelliğine sahip bir Avro kodlayıcı sağlar.

Kaynak kodu | Paket (PyPi) | API başvuru belgeleri | Örnekleri | Changelog

Bildirim

Python 2.7 için Azure SDK Python paketleri desteği 01 Ocak 2022'de sona erdi. Daha fazla bilgi ve soru için lütfen https://github.com/Azure/azure-sdk-for-python/issues/20691

Başlarken

Paketi yükleme

Pip ile Python için Azure Schema Registry Avro Kodlayıcısı istemci kitaplığını yükleyin:

pip install azure-schemaregistry-avroencoder

Ön koşullar:

Bu paketi kullanmak için aşağıdakilere sahip olmanız gerekir:

İstemcinin kimliğini doğrulama

Şema Kayıt Defteri Avro Kodlayıcısı ile etkileşim, şema grubu adını ve Şema Kayıt Defteri İstemci sınıfını alan bir AvroEncoder sınıfı örneğiyle başlar. İstemci oluşturucu, Event Hubs tam ad alanını ve Azure Active Directory kimlik bilgilerini alır:

  • Schema Registry örneğinin tam ad alanı şu biçimde olmalıdır: <yournamespace>.servicebus.windows.net.

  • TokenCredential protokolunu uygulayan bir AAD kimlik bilgisi oluşturucuya geçirilmelidir. Azure-identity paketindeTokenCredential kullanılabilen protokol uygulamaları vardır. tarafından azure-identitysağlanan kimlik bilgisi türlerini kullanmak için lütfen pip ile Python için Azure Identity istemci kitaplığını yükleyin:

pip install azure-identity
  • Ayrıca, zaman uyumsuz API'yi kullanmak için önce aiohttp:
pip install aiohttp

azure-schemaregistry kitaplığını kullanarak AvroEncoder oluşturun:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Önemli kavramlar

AvroEncoder

Avro İkili Kodlaması ile şema kimliğine sahip bir içerik türüne kodlamak ve bu kodlamayı çözmek için API sağlar. Şema içeriğinden şema kimliklerini almak için SchemaRegistryClient kullanır (veya tam tersi).

Desteklenen ileti modelleri

ile AvroEncoderbirlikte çalışabilirlik için belirli Azure Mesajlaşma SDK'sı model sınıflarına destek eklendi. Bu modeller, ad alanı altında tanımlanan protokolün alt türleridir MessageTypeazure.schemaregistry.encoder.avroencoder . Şu anda desteklenen model sınıfları şunlardır:

  • azure.eventhub.EventData Için azure-eventhub>=5.9.0

İleti biçimi

Kodlama için kodlayıcıya MessageType protokolünden sonra gelen bir ileti türü sağlanırsa, karşılık gelen içerik ve içerik türü özelliklerini ayarlar; burada:

  • content: Avro yükü (genel olarak biçime özgü yük)

    • Avro İkili Kodlaması
    • NOT Şemayı içeren ve şemayı ileti yükünün dışına ve şema kayıt defterine taşımak için bu kodlayıcının amacını alt eden Avro Nesne Kapsayıcı Dosyası.
  • content type: biçiminde avro/binary+<schema ID>bir dize; burada:

    • avro/binary biçim göstergesidir
    • <schema ID> GUID'nin onaltılık gösterimidir, Şema Kayıt Defteri hizmetindeki dizeyle aynı biçim ve bayt sırasıdır.

İleti türü olarak geçirilirse EventData nesnesinde EventData aşağıdaki özellikler ayarlanır:

  • body özelliği içerik değerine ayarlanır.
  • content_type özelliği içerik türü değerine ayarlanır.

İleti türü sağlanmazsa ve varsayılan olarak kodlayıcı aşağıdaki dikteyi oluşturur: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

Örnekler

Aşağıdaki bölümlerde, aşağıdakiler de dahil olmak üzere en yaygın Şema Kayıt Defteri görevlerinden bazılarını kapsayan çeşitli kod parçacıkları sağlanır:

Encoding

AvroEncoder.encode verilen Avro şemasıyla içeriği kodlamak için yöntemini kullanın. yöntemi daha önce Schema Registry hizmetine kaydedilmiş bir şema kullanır ve gelecekte kodlama kullanımı için şemayı önbelleğe alır. Şemanın hizmete önceden kaydedilmesini ve yöntemiyle encode otomatik olarak kaydedilmesini önlemek için anahtar sözcük bağımsız değişkeni auto_register=True oluşturucuya AvroEncoder geçirilmelidir.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

Kod çözme

AvroEncoder.decode Avro ile kodlanmış içeriğin kodunu çözmek için aşağıdakilerden birini kullanarak yöntemini kullanın:

  • MessageType protokolünün alt türü olan bir ileti nesnesi geçirme.
  • Anahtarlar content(tür bayt) ve content_type (tür dizesi) ile bir dikte geçirme. yöntemi şemayı Şema Kayıt Defteri Hizmeti'nden otomatik olarak alır ve gelecekte kod çözme kullanımı için şemayı önbelleğe alır.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Event Hubs Gönderme Tümleştirmesi

Avro ile kodlanmış içeriğe ve karşılık gelen content_typeöğesine ayarlanmış bir EventData nesne göndermek için Event Hubs ile body tümleştirme.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Event Hubs Alma Tümleştirmesi

Bir EventData nesne almak ve Avro ile kodlanmış body değerin kodunu çözmek için Event Hubs ile tümleştirme.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Sorun giderme

Genel

Azure Schema Registry Avro Kodlayıcısı, Schema Registry hizmetiyle iletişim kurarken hatalarla karşılaşılırsa Azure Core'da tanımlanan özel durumları tetikler. Geçersiz içerik/içerik türleri ve geçersiz şemalarla ilgili hatalar sırasıyla ve azure.schemaregistry.encoder.avroencoder.InvalidSchemaErrorolarak azure.schemaregistry.encoder.avroencoder.InvalidContentError oluşturulur ve burada __cause__ Apache Avro kitaplığı tarafından tetiklenen temel özel durum yer alır.

Günlüğe Kaydetme

Bu kitaplık, günlüğe kaydetme için standart günlük kitaplığını kullanır. HTTP oturumlarıyla ilgili temel bilgiler (URL'ler, üst bilgiler vb.) BİlGİ düzeyinde günlüğe kaydedilir.

İstek/yanıt gövdeleri ve kaydedilmemiş üst bilgiler de dahil olmak üzere ayrıntılı HATA AYıKLAMA düzeyi günlüğe kaydetme, bir istemcide şu bağımsız değişkenle logging_enable etkinleştirilebilir:

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

Benzer şekilde, logging_enable istemci için etkinleştirilmemiş olsa bile tek bir işlem için ayrıntılı günlüğe kaydetmeyi etkinleştirebilir:

encoder.encode(dict_content, schema=definition, logging_enable=True)

Sonraki adımlar

Daha fazla örnek kod

Yaygın Azure Şema Kayıt Defteri Avro Kodlayıcısı senaryolarını gösteren diğer örnekler samples dizinindedir.

Katkıda bulunma

Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.

Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.

Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorular veya yorumlarla iletişime geçin opencode@microsoft.com .