إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
Apache Avro هو نظام تسلسل البيانات شائع الاستخدام في عالم الدفق. الحل النموذجي هو وضع البيانات بتنسيق Avro في Apache Kafka، وبيانات التعريف في Confluent Schema Registry، ثم تشغيل الاستعلامات باستخدام إطار عمل دفق يتصل بكل من Kafka وSema Registry.
يدعم from_avro Azure Databricks الدوال و to_avro لإنشاء مسارات تدفق مع بيانات Avro في Kafka وبيانات التعريف في سجل المخطط. تقوم الدالة to_avro بترميز عمود كثنائي بتنسيق Avro وفك from_avro ترميز بيانات Avro الثنائية في عمود. تقوم كلتا الدالتين بتحويل عمود إلى عمود آخر، ويمكن أن يكون نوع بيانات SQL للإدخال/الإخراج نوعا معقدا أو نوعا بدائيا.
إشعار
الدالتين from_avro و to_avro :
- تتوفر في Python وSc scala وJava.
- يمكن تمريرها إلى وظائف SQL في كل من الاستعلامات الدفعية والاستعلامات المتدفقة.
راجع أيضا مصدر بيانات ملف Avro.
مثال المخطط المحدد يدويا
على غرار from_json to_json، يمكنك استخدام from_avro ومع to_avro أي عمود ثنائي. يمكنك تحديد مخطط Avro يدويا، كما في المثال التالي:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
مثال jsonFormatSchema
يمكنك أيضا تحديد مخطط كسلسلة JSON. على سبيل المثال، إذا كان /tmp/user.avsc :
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
يمكنك إنشاء سلسلة JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
ثم استخدم المخطط في from_avro:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
مثال مع سجل المخطط
إذا كان نظام المجموعة الخاص بك يحتوي على خدمة سجل المخطط، from_avro فيمكن العمل معها بحيث لا تحتاج إلى تحديد مخطط Avro يدويا.
يوضح المثال التالي قراءة موضوع Kafka "t"، بافتراض أن المفتاح والقيمة مسجلان بالفعل في سجل المخطط كمواضيع "t-key" و"t-value" من الأنواع STRING و INT:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
بالنسبة إلى to_avro، قد لا يتطابق مخطط Avro الإخراج الافتراضي مع مخطط الموضوع الهدف في خدمة سجل المخطط للأسباب التالية:
- التعيين من نوع Spark SQL إلى مخطط Avro ليس واحدا لواحد. راجع الأنواع المدعومة لتحويل Spark SQL -> Avro.
- إذا كان مخطط Avro للإخراج المحول من نوع السجل، يكون
topLevelRecordاسم السجل ولا توجد مساحة اسم بشكل افتراضي.
إذا كان مخطط الإخراج الافتراضي ل to_avro يطابق مخطط الموضوع الهدف، يمكنك القيام بما يلي:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
وإلا، يجب توفير مخطط الموضوع الهدف في الدالة to_avro :
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
المصادقة على سجل مخطط Confluent خارجي
في Databricks Runtime 12.2 LTS وما فوق، يمكنك المصادقة على سجل مخطط Confluent خارجي. توضح الأمثلة التالية كيفية تكوين خيارات تسجيل المخطط لتضمين بيانات اعتماد المصادقة ومفاتيح واجهة برمجة التطبيقات.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
استخدام ملفات مخزن الثقة ومخزن المفاتيح في وحدات تخزين كتالوج Unity
في Databricks Runtime 14.3 LTS وما فوق، يمكنك استخدام ملفات مخزن الثقة ومخزن المفاتيح في وحدات تخزين كتالوج Unity للمصادقة على سجل مخطط Confluent. تحديث التكوين في المثال السابق باستخدام بناء الجملة التالي:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
استخدام وضع تطور المخطط مع from_avro
في Databricks Runtime 14.2 وما فوق، يمكنك استخدام وضع تطور المخطط مع from_avro. يؤدي تمكين وضع تطور المخطط إلى طرح UnknownFieldException الوظيفة بعد الكشف عن تطور المخطط. توصي Databricks بتكوين الوظائف باستخدام وضع تطور المخطط لإعادة التشغيل تلقائيا عند فشل المهمة. راجع اعتبارات الإنتاج للبث المنظم.
يعد تطور المخطط مفيدا إذا كنت تتوقع تطور مخطط بيانات المصدر بمرور الوقت واستيعاب جميع الحقول من مصدر البيانات. إذا كانت الاستعلامات الخاصة بك تحدد بالفعل بشكل صريح الحقول التي يجب الاستعلام عنها في مصدر البيانات، يتم تجاهل الحقول المضافة بغض النظر عن تطور المخطط.
avroSchemaEvolutionMode استخدم الخيار لتمكين تطور المخطط. يصف الجدول التالي خيارات وضع تطور المخطط:
| خيار | سلوك |
|---|---|
none |
Default. يتجاهل تطور المخطط وتستمر المهمة. |
restart |
UnknownFieldException يطرح عند الكشف عن تطور المخطط. يتطلب إعادة تشغيل مهمة. |
إشعار
يمكنك تغيير هذا التكوين بين مهام الدفق وإعادة استخدام نفس نقطة التحقق. يمكن أن يؤدي تعطيل تطور المخطط إلى إسقاط الأعمدة.
تكوين وضع التحليل
يمكنك تكوين وضع التحليل لتحديد ما إذا كنت تريد الفشل أو إرسال سجلات فارغة عند تعطيل وضع تطور المخطط وتطور المخطط بطريقة غير متوافقة مع الإصدارات السابقة. مع الإعدادات الافتراضية، from_avro يفشل عندما يلاحظ تغييرات مخطط غير متوافقة.
mode استخدم الخيار لتحديد وضع التحليل. يصف الجدول التالي خيار وضع التحليل:
| خيار | سلوك |
|---|---|
FAILFAST |
Default. يطرح SparkException خطأ تحليل مع من errorClass MALFORMED_AVRO_MESSAGE. |
PERMISSIVE |
يتم تجاهل خطأ تحليل ويتم إصدار سجل فارغ. |
إشعار
مع تمكين تطور المخطط، FAILFAST يطرح استثناءات فقط إذا كان السجل تالفة.
مثال على استخدام تطور المخطط وإعداد وضع التحليل
يوضح المثال التالي تمكين تطور المخطط وتحديد FAILFAST وضع التحليل مع سجل مخطط Confluent:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)