Python için Azure Schema Registry Avro Seri hale getirici istemci kitaplığı - sürüm 1.0.0b4
Azure Schema Registry, Azure Event Hubs tarafından barındırılan, şema depolama, sürüm oluşturma ve yönetim sağlayan bir şema deposu hizmetidir. Bu paket, Şema Kayıt Defteri şema tanımlayıcılarını ve Avro ile kodlanmış verileri içeren yükleri seri hale getirebilen ve seri durumdan çıkarabilen bir Avro seri hale getirici sağlar.
Kaynak kodu | Paket (PyPi) | API başvuru belgeleri | Örnekleri | Changelog
Bildirim
Python 2.7 için Azure SDK Python paketleri desteği 01 Ocak 2022'de sona eriyor. Daha fazla bilgi ve soru için lütfen https://github.com/Azure/azure-sdk-for-python/issues/20691
Başlarken
Paketi yükleme
Pip ile Python için Azure Schema Registry Avro Seri Hale Getirici istemci kitaplığını ve Azure Identity istemci kitaplığını yükleyin:
pip install azure-schemaregistry-avroserializer azure-identity
Ön koşullar:
Bu paketi kullanmak için aşağıdakilere sahip olmanız gerekir:
- Azure aboneliği - Ücretsiz hesap oluşturma
- Azure Schema Registry
- Python 2.7, 3.6 veya üzeri - Python'ı yükleme
İstemcinin kimliğini doğrulama
Schema Registry Avro Serializer ile etkileşim, şema grubu adını ve Schema Registry client sınıfını alan AvroSerializer sınıfının bir örneğiyle başlar. İstemci oluşturucu, Event Hubs tam ad alanını ve Azure Active Directory kimlik bilgilerini alır:
Schema Registry örneğinin tam ad alanı şu biçimde olmalıdır:
<yournamespace>.servicebus.windows.net
.TokenCredential protokolunu uygulayan bir AAD kimlik bilgisi oluşturucuya geçirilmelidir. Azure-identity paketinde
TokenCredential
protokolün uygulamaları vardır. tarafındanazure-identity
sağlanan kimlik bilgisi türlerini kullanmak için lütfen pip ile Python için Azure Identity istemci kitaplığını yükleyin:
pip install azure-identity
- Ayrıca Python 3.6+ üzerinde desteklenen zaman uyumsuz API'yi kullanmak için önce aiohttp gibi bir zaman uyumsuz aktarım yüklemeniz gerekir:
pip install aiohttp
azure-schemaregistry kitaplığını kullanarak AvroSerializer oluşturun:
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
Önemli kavramlar
AvroSerializer
Avro İkili Kodlaması'na seri hale getirmek ve seri durumdan çıkarmanın yanı sıra şema kimliğine sahip bir üst bilgi sağlamak için API sağlar. Şema içeriğinden şema kimliklerini almak için SchemaRegistryClient kullanır (veya tam tersi).
İleti biçimi
Aynı biçim, Azure SDK dillerindeki şema kayıt defteri serileştiricileri tarafından kullanılır.
İletiler aşağıdaki gibi kodlanır:
4 bayt: Biçim Göstergesi
- Şu anda aşağıda biçimi belirtmek için her zaman sıfırdır.
32 bayt: Şema Kimliği
- GUID'nin UTF-8 onaltılık gösterimi.
- 32 onaltılık basamak, kısa çizgi yok.
- Schema Registry hizmetindeki dize ile aynı biçim ve bayt sırası.
Kalan bayt sayısı: Avro yükü (genel olarak biçime özgü yük)
- Avro İkili Kodlaması
- ŞEMAyı içeren ve şemayı ileti yükünün dışına ve şema kayıt defterine taşımak için bu seri oluşturucunun amacını yenen NOT Avro Nesne Kapsayıcı Dosyası.
Örnekler
Aşağıdaki bölümlerde, aşağıdakiler dahil olmak üzere en yaygın Schema Registry görevlerinden bazılarını kapsayan çeşitli kod parçacıkları sağlanır:
Serileştirme
Verilen avro şemasıyla dikte verilerini seri hale getirmek için yöntemini kullanın AvroSerializer.serialize
.
yöntemi daha önce Schema Registry hizmetine kaydedilmiş bir şema kullanır ve gelecekte serileştirme kullanımı için şemayı önbelleğe alır. Ayrıca şemayı hizmete önceden kaydetmekten ve anahtar sözcük bağımsız değişkeniyle serialize
auto_register_schemas=True
örneği başlatarak AvroSerializer
yöntemine otomatik olarak kaydolmaktan kaçınmak da mümkündür.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
encoded_bytes = serializer.serialize(dict_data, schema=definition)
Deserialization
Ham baytları dikte verilerine seri durumdan çıkartmak için yöntemini kullanın AvroSerializer.deserialize
.
yöntemi şemayı Schema Registry Service'ten otomatik olarak alır ve gelecekte seri durumdan çıkarma kullanımı için şemayı önbelleğe alır.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
with serializer:
encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
decoded_data = serializer.deserialize(encoded_bytes)
Event Hubs Gönderme Tümleştirmesi
Serileştirilmiş avro dikte verilerini EventData'nın gövdesi olarak göndermek için Event Hubs ile tümleştirme.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
Event Hubs Alma Tümleştirmesi
Avro dikte verilerini almak ve seri durumdan çıkarılmış ham baytları almak EventData
için Event Hubs ile tümleştirme.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Sorun giderme
Genel
Azure Schema Registry Avro Seri Hale Getirici , Azure Core'da tanımlanan özel durumları tetikler.
Günlüğe Kaydetme
Bu kitaplık , günlüğe kaydetme için standart günlük kitaplığını kullanır. HTTP oturumları (URL'ler, üst bilgiler vb.) hakkındaki temel bilgiler BİlGİ düzeyinde günlüğe kaydedilir.
İstek/yanıt gövdeleri ve kaydedilmemiş üst bilgiler de dahil olmak üzere ayrıntılı HATA AYıKLAMA düzeyi günlüğe kaydetme, istemcide şu bağımsız değişkenle logging_enable
etkinleştirilebilir:
import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")
Benzer şekilde, logging_enable
istemci için etkinleştirilmemiş olsa bile tek bir işlem için ayrıntılı günlüğe kaydetmeyi etkinleştirebilir:
serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)
Sonraki adımlar
Daha fazla örnek kod
Yaygın Azure Schema Registry Avro Seri hale getirici senaryolarını gösteren samples dizininde daha fazla örnek bulabilirsiniz.
Katkıda bulunma
Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.
Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.
Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorularınız veya yorumlarınızla iletişime geçin opencode@microsoft.com .
Azure SDK for Python