إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
Apache Avro هو نظام تسلسل البيانات. يوفر Avro:
- بنيات البيانات الغنية.
- تنسيق بيانات ثنائي مضغوط وسريع.
- ملف حاوية، لتخزين البيانات الثابتة.
- استدعاء الإجراء عن بعد (RPC).
- تكامل بسيط مع اللغات الديناميكية. لا يلزم إنشاء التعليمات البرمجية لقراءة ملفات البيانات أو كتابتها ولا لاستخدام بروتوكولات RPC أو تنفيذها. إنشاء التعليمات البرمجية كتحسين اختياري، يستحق التنفيذ فقط للغات مكتوبة بشكل ثابت.
يدعم مصدر بيانات Avro ما يلي:
- تحويل المخطط: التحويل التلقائي بين سجلات Apache Spark SQL وAvro.
- التقسيم: قراءة وكتابة البيانات المقسمة بسهولة دون أي تكوين إضافي.
- الضغط: الضغط لاستخدامه عند كتابة Avro إلى القرص. الأنواع المدعومة هي
uncompressedوsnappyو.deflateيمكنك أيضا تحديد مستوى الانكماش. - أسماء السجلات: اسم السجل ومساحة الاسم عن طريق تمرير خريطة المعلمات مع
recordNameوrecordNamespace.
راجع أيضا قراءة وكتابة بيانات Avro المتدفقة.
التكوين
يمكنك تغيير سلوك مصدر بيانات Avro باستخدام معلمات التكوين المختلفة.
لتجاهل الملفات بدون .avro الملحق عند القراءة، يمكنك تعيين المعلمة avro.mapred.ignore.inputs.without.extension في تكوين Hadoop. الافتراضي هو false.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
لتكوين الضغط عند الكتابة، قم بتعيين خصائص Spark التالية:
- برنامج ترميز الضغط:
spark.sql.avro.compression.codec. برامج الترميز المدعومة هيsnappyوdeflate. برنامج الترميز الافتراضي هوsnappy. - إذا كان برنامج ترميز الضغط هو
deflate، يمكنك تعيين مستوى الضغط باستخدام:spark.sql.avro.deflate.level. المستوى الافتراضي هو-1.
يمكنك تعيين هذه الخصائص في تكوين Spark للمجموعة أو في وقت التشغيل باستخدام spark.conf.set(). على سبيل المثال:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
بالنسبة إلى Databricks Runtime 9.1 LTS والإصدارات الأحدث، يمكنك تغيير سلوك استنتاج المخطط الافتراضي في Avro عن طريق توفير mergeSchema الخيار عند قراءة الملفات. سيؤدي الإعداد mergeSchema إلى true استنتاج مخطط من مجموعة من ملفات Avro في الدليل الهدف ودمجها بدلا من استنتاج مخطط القراءة من ملف واحد.
الأنواع المدعومة لتحويل Avro -> Spark SQL
تدعم هذه المكتبة قراءة جميع أنواع Avro. يستخدم التعيين التالي من أنواع Avro إلى أنواع Spark SQL:
| نوع Avro | نوع Spark SQL |
|---|---|
| boolean | BooleanType |
| العدد الصحيح | IntegerType |
| طويل | LongType |
| عائم | FloatType |
| مزدوج | DoubleType |
| وحدات البايت | نوع ثنائي |
| سلسلة | StringType |
| سجل | نوع البنية |
| enum | StringType |
| صفيف | نوع الصفيف |
| map | نوع الخريطة |
| ثابت | نوع ثنائي |
| union | راجع أنواع الاتحاد. |
أنواع الاتحاد
يدعم مصدر بيانات Avro أنواع القراءة union . يعتبر Avro الأنواع الثلاثة التالية أنواعا union :
union(int, long)تعيين إلىLongType.union(float, double)تعيين إلىDoubleType.union(something, null)، حيثsomethingهو أي نوع Avro مدعوم. يتم تعيين هذا إلى نفس نوع Spark SQL مثل ،somethingمعnullableتعيين إلىtrue.
جميع الأنواع الأخرى union أنواع معقدة. يتم تعيينها إلى StructType حيث تكون member0أسماء الحقول ، member1وما إلى ذلك، وفقا لأعضاء union. هذا متناسق مع السلوك عند التحويل بين Avro وParquet.
الأنواع المنطقية
يدعم مصدر بيانات Avro قراءة أنواع Avro المنطقية التالية:
| نوع Avro المنطقي | نوع Avro | نوع Spark SQL |
|---|---|---|
| date | العدد الصحيح | DateType |
| الطابع الزمني-مللي ثانية | طويل | نوع الطابع الزمني |
| timestamp-micros | طويل | نوع الطابع الزمني |
| عشري | ثابت | نوع عشري |
| عشري | وحدات البايت | نوع عشري |
إشعار
يتجاهل مصدر بيانات Avro المستندات والأسماء المستعارة والخصائص الأخرى الموجودة في ملف Avro.
الأنواع المدعومة ل Spark SQL -> تحويل Avro
تدعم هذه المكتبة كتابة جميع أنواع Spark SQL في Avro. بالنسبة لمعظم الأنواع، يكون التعيين من أنواع Spark إلى أنواع Avro بسيطا (على سبيل المثال IntegerType يتم تحويله إلى int)؛ فيما يلي قائمة بالحالات الخاصة القليلة:
| نوع Spark SQL | نوع Avro | نوع Avro المنطقي |
|---|---|---|
| ByteType | العدد الصحيح | |
| ShortType | العدد الصحيح | |
| نوع ثنائي | وحدات البايت | |
| نوع عشري | ثابت | عشري |
| نوع الطابع الزمني | طويل | timestamp-micros |
| DateType | العدد الصحيح | date |
يمكنك أيضا تحديد مخطط Avro للإخراج بالكامل مع الخيار avroSchema، بحيث يمكن تحويل أنواع Spark SQL إلى أنواع Avro أخرى.
لا يتم تطبيق التحويلات التالية بشكل افتراضي وتتطلب مخطط Avro المحدد من قبل المستخدم:
| نوع Spark SQL | نوع Avro | نوع Avro المنطقي |
|---|---|---|
| ByteType | ثابت | |
| StringType | enum | |
| نوع عشري | وحدات البايت | عشري |
| نوع الطابع الزمني | طويل | الطابع الزمني-مللي ثانية |
الأمثلة
تستخدم هذه الأمثلة ملف episodes.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
يوضح هذا المثال مخطط Avro مخصصا:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
يوضح هذا المثال خيارات ضغط Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
يوضح هذا المثال سجلات Avro المقسمة:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
يوضح هذا المثال اسم السجل ومساحة الاسم:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
للاستعلام عن بيانات Avro في SQL، قم بتسجيل ملف البيانات كجدول أو طريقة عرض مؤقتة:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
مثال دفتر الملاحظات: قراءة ملفات Avro وكتابتها
يوضح دفتر الملاحظات التالي كيفية قراءة ملفات Avro وكتابتها.