File Avro

Apache Avro adalah sistem serialisasi data. Avro menyediakan:

  • Struktur data model.
  • Format data biner yang ringkas, cepat.
  • File kontainer, untuk menyimpan data persisten.
  • Panggilan Prosedur Jarak Jauh (Remote Procedure Call/RPC).
  • Integrasi sederhana dengan bahasa dinamis. Pembuatan kode tidak diperlukan untuk membaca atau menulis file data atau menggunakan atau menerapkan protokol RPC. Pembuatan kode sebagai pengoptimalan opsional, hanya layak diterapkan untuk bahasa yang diketik secara statis.

Sumber data Avro mendukung:

  • Konversi skema: Konversi otomatis antara catatan Apache Spark SQL dan Avro.
  • Partisi: Membaca dan menulis data yang dipartisi dengan mudah tanpa konfigurasi tambahan.
  • Kompresi: Kompresi untuk digunakan saat menulis Avro ke disk. Jenis yang didukung adalah uncompressed, snappy, dan deflate. Anda juga dapat menentukan tingkat akses.
  • Nama rekaman: Rekam nama dan namespace layanan dengan melewati peta parameter dengan recordName dan recordNamespace.

Lihat juga Membaca dan menulis data Avro streaming.

Konfigurasi

Anda dapat mengubah perilaku sumber data Avro menggunakan berbagai parameter konfigurasi.

Untuk mengabaikan file tanpa ekstensi .avro saat membaca, Anda dapat mengatur parameter avro.mapred.ignore.inputs.without.extension dalam konfigurasi Hadoop. Default adalah false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Untuk mengonfigurasi kompresi saat menulis, atur properti Spark berikut:

  • Codec kompresi: spark.sql.avro.compression.codec. Codec yang didukung adalah snappy dan deflate. Codec default adalah snappy.
  • Jika codec kompresi adalah deflate, Anda dapat mengatur tingkat kompresi dengan: spark.sql.avro.deflate.level. Tingkat default adalah -1.

Anda dapat mengatur properti ini dalam konfigurasi Spark kluster atau saat runtime bahasa umum menggunakan spark.conf.set(). Misalnya:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Untuk Runtime Bahasa Umum Databricks 9.1 LTS ke atas, Anda dapat mengubah perilaku inferensi skema default di Avro dengan menyediakan opsi mergeSchema saat membaca file. Pengaturan mergeSchema untuk true menyimpulkan skema dari satu set file Avro di direktori target dan menggabungkannya daripada menyimpulkan skema baca dari satu file.

Jenis yang didukung untuk konversi Avro -> Spark SQL

Perpustakaan ini mendukung membaca semua jenis Avro. Jenis ini menggunakan pemetaan berikut dari jenis Avro ke jenis Spark SQL:

Jenis Avro Jenis SQL Spark
Boolean BooleanType
int IntegerType
long LongType
float FloatType
ganda DoubleType
byte BinaryType
string StringType
record StructType
enum StringType
array ArrayType
peta MapType
Teratasi BinaryType
penyatuan Lihat Jenis Union.

Jenis serikat pekerja

Sumber data Avro mendukung jenis pembacaan union. Avro menganggap tiga jenis berikut ini sebagai jenis union:

  • union(int, long) mematakan ke LongType.
  • union(float, double) mematakan ke DoubleType.
  • union(something, null), di mana something adalah jenis Avro yang didukung. Ini memetakan ke jenis Spark SQL yang somethingsama dengan nullable yang diatur ke true.

Semua jenis union lainnya adalah tipe yang kompleks. Jenis ini dipetakan ke StructType di mana nama bidangnya member0, member1, dan seterusnya, sesuai dengan anggota union. Ini konsisten dengan perilaku saat mengonversi antara Avro dan Parket.

Tipe logika

Sumber data Avro mendukung pembacaan jenis logis Avro berikut:

Jenis logis Avro Jenis Avro Jenis SQL Spark
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
desimal Teratasi DecimalType
desimal byte DecimalType

Catatan

Sumber data Avro mengabaikan dokumen, alias, dan properti lain yang ada dalam file Avro.

Jenis yang didukung untuk SQL Spark -> Konversi Avro

Perpustakaan ini mendukung penulisan semua jenis Spark SQL ke Avro. Untuk sebagian besar jenis, pemetaan dari jenis Spark ke jenis Avro mudah (misalnya IntegerType dikonversi ke int); berikut ini adalah daftar beberapa kasus khusus:

Jenis SQL Spark Jenis Avro Jenis logis Avro
ByteType int
ShortType int
BinaryType byte
DecimalType Teratasi desimal
TimestampType long timestamp-micros
DateType int date

Anda juga dapat menentukan seluruh skema Avro keluaran dengan opsi avroSchema, sehingga jenis Spark SQL dapat dikonversi menjadi jenis Avro lainnya. Konversi berikut tidak diterapkan secara default dan memerlukan skema Avro yang ditentukan pengguna:

Jenis SQL Spark Jenis Avro Jenis logis Avro
ByteType Teratasi
StringType enum
DecimalType byte desimal
TimestampType long timestamp-millis

Contoh

Contoh-contoh ini menggunakan file episodes.avro.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Contoh ini menunjukkan skema Avro kustom:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Contoh ini menunjukkan opsi kompresi Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Contoh ini menunjukkan catatan Avro yang dipartisi:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Contoh ini menunjukkan nama rekaman dan ruang nama:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Untuk mengkueri data Avro di SQL, daftarkan file data sebagai tabel atau tampilan sementara:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Contoh buku catatan: Membaca dan menulis file Avro

Buku catatan berikut menunjukkan cara membaca dan menulis file Avro.

Membaca dan menulis buku catatan file Avro

Dapatkan buku catatan