File Avro

Apache Avro adalah sistem serialisasi data. Avro menyediakan:

  • Struktur data yang kaya.
  • Format data biner yang ringkas, cepat.
  • File kontainer, untuk menyimpan data persisten.
  • Panggilan Prosedur Jarak Jauh (Remote Procedure Call/RPC).
  • Integrasi sederhana dengan bahasa dinamis. Pembuatan kode tidak diperlukan untuk membaca atau menulis file data atau menggunakan atau menerapkan protokol RPC. Pembuatan kode sebagai pengoptimalan opsional, hanya layak diterapkan untuk bahasa yang diketik secara statis.

Sumber data Avro mendukung:

  • Konversi skema: Konversi otomatis antara rekaman Apache Spark SQL dan Avro.
  • Partisi: Membaca dan menulis data yang dipartisi dengan mudah tanpa konfigurasi tambahan.
  • Kompresi: Kompresi untuk digunakan saat menulis Avro ke disk. Jenis yang didukung adalah uncompressed, snappy, dan deflate. Anda juga dapat menentukan tingkat kompresi.
  • Nama rekaman: Nama rekaman dan namespace dengan mengoper peta parameter dengan recordName dan recordNamespace.

Lihat juga Membaca dan menulis data Avro streaming.

Konfigurasi

Anda dapat mengubah perilaku sumber data Avro menggunakan berbagai parameter konfigurasi.

Untuk mengabaikan file tanpa ekstensi .avro saat membaca, Anda dapat mengatur parameter avro.mapred.ignore.inputs.without.extension dalam konfigurasi Hadoop. Default adalah false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Untuk mengonfigurasi pemadatan saat menulis, atur properti Spark berikut:

  • Codec kompresi: spark.sql.avro.compression.codec. Codec yang didukung adalah snappy dan deflate. Codec default adalah snappy.
  • Jika codec kompresi deflate, Anda dapat mengatur tingkat kompresi dengan: spark.sql.avro.deflate.level. Tingkat default adalah -1.

Anda dapat mengatur properti ini dalam konfigurasi Spark kluster atau saat runtime menggunakan spark.conf.set(). Contohnya:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Untuk Databricks Runtime 9.1 LTS ke atas, Anda dapat mengubah perilaku inferensi skema default di Avro dengan memberikan mergeSchema opsi saat membaca file. Mengatur mergeSchema ke true akan menyimpulkan skema dari sekumpulan file Avro di direktori target dan menggabungkannya daripada menyimpulkan skema baca dari satu file.

Jenis yang didukung untuk konversi Avro -> Spark SQL

Perpustakaan ini mendukung pembacaan semua tipe Avro. Jenis ini menggunakan pemetaan berikut dari jenis Avro ke jenis Spark SQL:

Jenis Avro Jenis SQL Spark
Boolean BooleanType
int (integer) Tipe Integer
panjang Tipe Panjang
float FloatType
ganda DoubleType
Byte BinaryType
string StringType
rekaman StructType
enum StringType
array ArrayType
memetakan JenisPeta
Diperbaiki BinaryType
penyatuan Lihat Jenis union.

Jenis serikat pekerja

Sumber data Avro mendukung pembacaan jenis union. Avro menganggap tiga jenis berikut ini sebagai jenis union:

  • union(int, long) mematakan ke LongType.
  • union(float, double) mematakan ke DoubleType.
  • union(something, null), di mana something adalah jenis Avro yang didukung. Ini memetakan ke jenis Spark SQL yang sama dengan something, dengan nullable ditetapkan menjadi true.

Semua jenis union lainnya adalah tipe yang kompleks. Pemetaan dilakukan ke StructType di mana nama bidang adalah member0, member1, dan seterusnya, sesuai dengan anggota union. Ini konsisten dengan perilaku saat mengonversi antara Avro dan Parquet.

Tipe logis

Sumber data Avro mendukung pembacaan jenis logika Avro berikut:

Tipe logis Avro Jenis Avro Jenis SQL Spark
tanggal int (integer) TipeTanggal
cap waktu milidetik panjang Tipe Penanda Waktu
cap waktu mikrodetik panjang Tipe Penanda Waktu
desimal Diperbaiki TipeDesimal
desimal Byte TipeDesimal

Catatan

Sumber data Avro mengabaikan dokumen, alias, dan properti lain yang ada dalam file Avro.

Jenis yang didukung untuk Spark SQL -> Pengonversian Avro

Perpustakaan ini mendukung penulisan semua jenis Spark SQL ke Avro. Untuk sebagian besar jenis, pemetaan dari jenis Spark ke jenis Avro sangat mudah (misalnya IntegerType dikonversi ke int); berikut ini adalah daftar beberapa kasus khusus:

Jenis SQL Spark Jenis Avro Tipe logis Avro
ByteType int (integer)
ShortType int (integer)
BinaryType Byte
TipeDesimal Diperbaiki desimal
Tipe Penanda Waktu panjang cap waktu mikrodetik
TipeTanggal int (integer) tanggal

Anda juga dapat menentukan seluruh skema Avro output dengan opsi avroSchema, sehingga jenis Spark SQL dapat dikonversi ke jenis Avro lainnya. Konversi berikut tidak diterapkan secara default dan memerlukan skema Avro yang ditentukan pengguna:

Jenis SQL Spark Jenis Avro Tipe logis Avro
ByteType Diperbaiki
StringType enum
TipeDesimal Byte desimal
Tipe Penanda Waktu panjang cap waktu milidetik

Contoh

Contoh-contoh ini menggunakan file episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Contoh ini menunjukkan skema Avro kustom:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Contoh ini menunjukkan opsi kompresi Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Contoh ini menunjukkan catatan Avro yang dipartisi:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Contoh ini menunjukkan nama rekaman dan ruang nama:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Phyton

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Untuk mengkueri data Avro di SQL, daftarkan file data sebagai tabel atau tampilan sementara:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Contoh buku catatan: Membaca dan menulis file Avro

Buku catatan berikut menunjukkan cara membaca dan menulis file Avro.

Membaca dan menulis buku catatan file Avro

Dapatkan buku catatan