Aracılığıyla paylaş


Python için Azure Schema Registry Avro Seri hale getirici istemci kitaplığı - sürüm 1.0.0b4

Azure Schema Registry, Azure Event Hubs tarafından barındırılan, şema depolama, sürüm oluşturma ve yönetim sağlayan bir şema deposu hizmetidir. Bu paket, Şema Kayıt Defteri şema tanımlayıcılarını ve Avro ile kodlanmış verileri içeren yükleri seri hale getirebilen ve seri durumdan çıkarabilen bir Avro seri hale getirici sağlar.

Kaynak kodu | Paket (PyPi) | API başvuru belgeleri | Örnekleri | Changelog

Bildirim

Python 2.7 için Azure SDK Python paketleri desteği 01 Ocak 2022'de sona eriyor. Daha fazla bilgi ve soru için lütfen https://github.com/Azure/azure-sdk-for-python/issues/20691

Başlarken

Paketi yükleme

Pip ile Python için Azure Schema Registry Avro Seri Hale Getirici istemci kitaplığını ve Azure Identity istemci kitaplığını yükleyin:

pip install azure-schemaregistry-avroserializer azure-identity

Ön koşullar:

Bu paketi kullanmak için aşağıdakilere sahip olmanız gerekir:

İstemcinin kimliğini doğrulama

Schema Registry Avro Serializer ile etkileşim, şema grubu adını ve Schema Registry client sınıfını alan AvroSerializer sınıfının bir örneğiyle başlar. İstemci oluşturucu, Event Hubs tam ad alanını ve Azure Active Directory kimlik bilgilerini alır:

  • Schema Registry örneğinin tam ad alanı şu biçimde olmalıdır: <yournamespace>.servicebus.windows.net.

  • TokenCredential protokolunu uygulayan bir AAD kimlik bilgisi oluşturucuya geçirilmelidir. Azure-identity paketindeTokenCredential protokolün uygulamaları vardır. tarafından azure-identitysağlanan kimlik bilgisi türlerini kullanmak için lütfen pip ile Python için Azure Identity istemci kitaplığını yükleyin:

pip install azure-identity
  • Ayrıca Python 3.6+ üzerinde desteklenen zaman uyumsuz API'yi kullanmak için önce aiohttp gibi bir zaman uyumsuz aktarım yüklemeniz gerekir:
pip install aiohttp

azure-schemaregistry kitaplığını kullanarak AvroSerializer oluşturun:

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Önemli kavramlar

AvroSerializer

Avro İkili Kodlaması'na seri hale getirmek ve seri durumdan çıkarmanın yanı sıra şema kimliğine sahip bir üst bilgi sağlamak için API sağlar. Şema içeriğinden şema kimliklerini almak için SchemaRegistryClient kullanır (veya tam tersi).

İleti biçimi

Aynı biçim, Azure SDK dillerindeki şema kayıt defteri serileştiricileri tarafından kullanılır.

İletiler aşağıdaki gibi kodlanır:

  • 4 bayt: Biçim Göstergesi

    • Şu anda aşağıda biçimi belirtmek için her zaman sıfırdır.
  • 32 bayt: Şema Kimliği

    • GUID'nin UTF-8 onaltılık gösterimi.
    • 32 onaltılık basamak, kısa çizgi yok.
    • Schema Registry hizmetindeki dize ile aynı biçim ve bayt sırası.
  • Kalan bayt sayısı: Avro yükü (genel olarak biçime özgü yük)

    • Avro İkili Kodlaması
    • ŞEMAyı içeren ve şemayı ileti yükünün dışına ve şema kayıt defterine taşımak için bu seri oluşturucunun amacını yenen NOT Avro Nesne Kapsayıcı Dosyası.

Örnekler

Aşağıdaki bölümlerde, aşağıdakiler dahil olmak üzere en yaygın Schema Registry görevlerinden bazılarını kapsayan çeşitli kod parçacıkları sağlanır:

Serileştirme

Verilen avro şemasıyla dikte verilerini seri hale getirmek için yöntemini kullanın AvroSerializer.serialize . yöntemi daha önce Schema Registry hizmetine kaydedilmiş bir şema kullanır ve gelecekte serileştirme kullanımı için şemayı önbelleğe alır. Ayrıca şemayı hizmete önceden kaydetmekten ve anahtar sözcük bağımsız değişkeniyle serializeauto_register_schemas=Trueörneği başlatarak AvroSerializer yöntemine otomatik olarak kaydolmaktan kaçınmak da mümkündür.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Deserialization

Ham baytları dikte verilerine seri durumdan çıkartmak için yöntemini kullanın AvroSerializer.deserialize . yöntemi şemayı Schema Registry Service'ten otomatik olarak alır ve gelecekte seri durumdan çıkarma kullanımı için şemayı önbelleğe alır.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Event Hubs Gönderme Tümleştirmesi

Serileştirilmiş avro dikte verilerini EventData'nın gövdesi olarak göndermek için Event Hubs ile tümleştirme.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Event Hubs Alma Tümleştirmesi

Avro dikte verilerini almak ve seri durumdan çıkarılmış ham baytları almak EventData için Event Hubs ile tümleştirme.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Sorun giderme

Genel

Azure Schema Registry Avro Seri Hale Getirici , Azure Core'da tanımlanan özel durumları tetikler.

Günlüğe Kaydetme

Bu kitaplık , günlüğe kaydetme için standart günlük kitaplığını kullanır. HTTP oturumları (URL'ler, üst bilgiler vb.) hakkındaki temel bilgiler BİlGİ düzeyinde günlüğe kaydedilir.

İstek/yanıt gövdeleri ve kaydedilmemiş üst bilgiler de dahil olmak üzere ayrıntılı HATA AYıKLAMA düzeyi günlüğe kaydetme, istemcide şu bağımsız değişkenle logging_enable etkinleştirilebilir:

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

Benzer şekilde, logging_enable istemci için etkinleştirilmemiş olsa bile tek bir işlem için ayrıntılı günlüğe kaydetmeyi etkinleştirebilir:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Sonraki adımlar

Daha fazla örnek kod

Yaygın Azure Schema Registry Avro Seri hale getirici senaryolarını gösteren samples dizininde daha fazla örnek bulabilirsiniz.

Katkıda bulunma

Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.

Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.

Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorularınız veya yorumlarınızla iletişime geçin opencode@microsoft.com .