Azure Schema Registry Avro Encoder ügyfélkódtár Pythonhoz – 1.0.0-s verzió
Az Azure Schema Registry egy Azure Event Hubs által üzemeltetett sémaadattár-szolgáltatás, amely sématárolást, verziószámozást és felügyeletet biztosít. Ez a csomag egy Avro-kódolót biztosít, amely sémaregisztrációs sémaazonosítókat és Avro-kódolású tartalmat tartalmazó hasznos adatok kódolására és dekódolására képes.
Forráskód | Csomag (PyPi) | API-referenciadokumentáció | Minták | Változatlan
Felelősséget kizáró nyilatkozat
Az Azure SDK Python-csomagok támogatása a Python 2.7-hez 2022. január 01-én véget ért. További információkért és kérdésekért lásd: https://github.com/Azure/azure-sdk-for-python/issues/20691
Első lépések
A csomag telepítése
Telepítse a Pythonhoz készült Azure Schema Registry Avro Encoder ügyfélkódtárat a pip használatával:
pip install azure-schemaregistry-avroencoder
Előfeltételek:
A csomag használatához a következőkre van szüksége:
- Azure-előfizetés – Ingyenes fiók létrehozása
- Azure Schema Registry - Az alábbi rövid útmutatóból megtudhatja, hogyan hozhat létre sémaregisztrációs adatbáziscsoportot a Azure Portal használatával.
- Python 3.6 vagy újabb – A Python telepítése
Az ügyfél hitelesítése
A Schema Registry Avro Encoderrel való interakció az AvroEncoder osztály egy példányával kezdődik, amely a sémacsoport nevét és a Sémaregisztrációs adatbázis ügyfélosztályát veszi fel. Az ügyfélkonstruktor az Event Hubs teljes névterét és az Azure Active Directory-hitelesítő adatokat használja:
A Schema Registry-példány teljes névterének a következő formátumot kell követnie:
<yournamespace>.servicebus.windows.net
.A TokenCredential protokollt megvalósító AAD-hitelesítő adatokat át kell adni a konstruktornak. Az azure-identity csomagban elérhető a protokoll implementációja
TokenCredential
. A általazure-identity
biztosított hitelesítőadat-típusok használatához telepítse a Pythonhoz készült Azure Identity ügyfélkódtárat a pip használatával:
pip install azure-identity
- Emellett az async API használatához először telepítenie kell egy aszinkron átvitelt, például aiohttp:
pip install aiohttp
AvroEncoder létrehozása az azure-schemaregistry kódtár használatával:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Fő fogalmak
AvroEncoder
Api-t biztosít az Avro Binary Encoding kódolásához és dekódolásához, valamint egy sémaazonosítóval rendelkező tartalomtípust. A SchemaRegistryClient használatával sémaazonosítókat szerez be a sématartalomból, vagy fordítva.
Támogatott üzenetmodellek
Bizonyos Azure Messaging SDK-modellosztályokhoz támogatás lett hozzáadva a AvroEncoder
szolgáltatással való együttműködés érdekében. Ezek a modellek a MessageType
névtérben azure.schemaregistry.encoder.avroencoder
meghatározott protokoll altípusai. Jelenleg a támogatott modellosztályok a következők:
azure.eventhub.EventData
a következőhöz:azure-eventhub>=5.9.0
Üzenetformátum
Ha a Kódoláshoz a MessageType protokollt követő üzenettípust adja meg a kódolónak, akkor beállítja a megfelelő tartalom- és tartalomtípus-tulajdonságokat, ahol:
content
: Avro hasznos adat (általában formátumspecifikus hasznos adatok)- Avro bináris kódolás
- NOT Avro Objektumtároló-fájl, amely tartalmazza a sémát, és legyőzi ennek a kódolónak a célját, hogy áthelyezze a sémát az üzenet hasznos adataiból és a sémaregisztrációs adatbázisba.
content type
: a formátumavro/binary+<schema ID>
sztringje, ahol:avro/binary
a formátumjelző<schema ID>
A a GUID hexadecimális reprezentációja, formátuma és bájtsorrendje megegyezik a Schema Registry szolgáltatás sztringével.
Ha EventData
a rendszer üzenettípusként adja át, a következő tulajdonságok lesznek beállítva az EventData
objektumon:
- A
body
tulajdonság a tartalomértékre lesz beállítva. - A
content_type
tulajdonság a tartalomtípus értékére lesz beállítva.
Ha nincs megadva üzenettípus, és alapértelmezés szerint a kódoló a következő diktálásokat hozza létre: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Példák
A következő szakaszok számos kódrészletet biztosítanak, amelyek a sémaregisztrációs adatbázis leggyakoribb feladatait fedik le, többek között az alábbiakat:
Encoding
AvroEncoder.encode
A metódussal kódolhatja a tartalmat a megadott Avro-sémával.
A metódus egy korábban a Schema Registry szolgáltatásban regisztrált sémát fog használni, és a séma gyorsítótárazott marad a jövőbeli kódolási használat érdekében. Annak érdekében, hogy a séma ne legyen előre regisztrálva a szolgáltatásban, és automatikusan regisztrálva legyen a encode
metódussal, a kulcsszóargumentumot auto_register=True
át kell adni a AvroEncoder
konstruktornak.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Dekódolás
Az Avro-kódolású tartalom dekódolásához használja a AvroEncoder.decode
metódust a következők egyikével:
- Olyan üzenetobjektum átadása, amely a MessageType protokoll altípusa.
- Diktálás kulcsokkal
content
(bájtok beírása) éscontent_type
(sztring beírása) A metódus automatikusan lekéri a sémát a Sémaregisztrációs adatbázis szolgáltatásból, és gyorsítótárazza a sémát a jövőbeli dekódolási használat érdekében.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Event Hubs-küldési integráció
Integráció az Event Hubs szolgáltatással body
egy EventData
Avro-kódolású tartalomra beállított és megfelelő content_type
objektum küldéséhez.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Integrációt fogadó Event Hubs
Integráció az Event Hubs szolgáltatással egy EventData
objektum fogadásához és az Avro által kódolt érték dekódolásához body
.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Hibaelhárítás
Általános kérdések
Az Azure Schema Registry Avro Encoder kivételeket hoz létre az Azure Core-ban , ha hibák történnek a Schema Registry szolgáltatással való kommunikáció során. Az érvénytelen tartalom-/tartalomtípusokkal és érvénytelen sémákkal kapcsolatos hibák a és azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
a néven jelennek megazure.schemaregistry.encoder.avroencoder.InvalidContentError
, ahol __cause__
az Apache Avro-kódtár által kiváltott mögöttes kivételt fogja tartalmazni.
Naplózás
Ez a kódtár a szabványos naplózási kódtárat használja a naplózáshoz. A HTTP-munkamenetekkel (URL-címekkel, fejlécekkel stb.) kapcsolatos alapvető információk az INFO szinten naplózva lesznek.
A hibakeresési szint részletes naplózása, beleértve a kérelem-/választörzseket és a nem aktivált fejléceket, engedélyezhető az ügyfélen a logging_enable
következő argumentummal:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Hasonlóképpen logging_enable
engedélyezheti a részletes naplózást egyetlen művelethez, még akkor is, ha nincs engedélyezve az ügyfél számára:
encoder.encode(dict_content, schema=definition, logging_enable=True)
Következő lépések
További mintakód
Az Azure Schema Registry Avro Encoder gyakori forgatókönyveit bemutató további példák a mintakönyvtárban találhatók.
Közreműködés
A projektben szívesen fogadjuk a hozzájárulásokat és a javaslatokat. A legtöbb hozzájáruláshoz el kell fogadnia egy Közreműködői licencszerződést (CLA-t), amelyben kijelenti, hogy jogosult arra, hogy ránk ruházza hozzájárulása felhasználási jogát, és ezt ténylegesen meg is teszi. További részletekért lásd: https://cla.microsoft.com.
A lekéréses kérelmek elküldésekor egy CLA-robot automatikusan meghatározza, hogy kell-e biztosítania CLA-t, és megfelelően kitölti a lekéréses kérelmet (például címke, megjegyzés). Egyszerűen csak kövesse a robot által megadott utasításokat. Ezt csak egyszer kell elvégeznie az összes olyan tárházban, amely a CLA-t használja.
A projekt a Microsoft nyílt forráskódú projekteket szabályozó etikai kódexe, a Microsoft Open Source Code of Conduct hatálya alá esik. További információkért lásd a viselkedési szabályzattal kapcsolatos gyakori kérdéseket , vagy vegye fel a kapcsolatot opencode@microsoft.com az esetleges további kérdésekkel vagy megjegyzésekkel.
Azure SDK for Python
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: