File Avro
Apache Avro adalah sistem serialisasi data. Avro menyediakan:
- Struktur data model.
- Format data biner yang ringkas, cepat.
- File kontainer, untuk menyimpan data persisten.
- Panggilan Prosedur Jarak Jauh (Remote Procedure Call/RPC).
- Integrasi sederhana dengan bahasa dinamis. Pembuatan kode tidak diperlukan untuk membaca atau menulis file data atau menggunakan atau menerapkan protokol RPC. Pembuatan kode sebagai pengoptimalan opsional, hanya layak diterapkan untuk bahasa yang diketik secara statis.
Sumber data Avro mendukung:
- Konversi skema: Konversi otomatis antara catatan Apache Spark SQL dan Avro.
- Partisi: Membaca dan menulis data yang dipartisi dengan mudah tanpa konfigurasi tambahan.
- Kompresi: Kompresi untuk digunakan saat menulis Avro ke disk. Jenis yang didukung adalah
uncompressed
,snappy
, dandeflate
. Anda juga dapat menentukan tingkat akses. - Nama rekaman: Rekam nama dan namespace layanan dengan melewati peta parameter dengan
recordName
danrecordNamespace
.
Lihat juga Membaca dan menulis data Avro streaming.
Konfigurasi
Anda dapat mengubah perilaku sumber data Avro menggunakan berbagai parameter konfigurasi.
Untuk mengabaikan file tanpa ekstensi .avro
saat membaca, Anda dapat mengatur parameter avro.mapred.ignore.inputs.without.extension
dalam konfigurasi Hadoop. Default adalah false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Untuk mengonfigurasi kompresi saat menulis, atur properti Spark berikut:
- Codec kompresi:
spark.sql.avro.compression.codec
. Codec yang didukung adalahsnappy
dandeflate
. Codec default adalahsnappy
. - Jika codec kompresi adalah
deflate
, Anda dapat mengatur tingkat kompresi dengan:spark.sql.avro.deflate.level
. Tingkat default adalah-1
.
Anda dapat mengatur properti ini dalam konfigurasi Spark kluster atau saat runtime bahasa umum menggunakan spark.conf.set()
. Misalnya:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Untuk Runtime Bahasa Umum Databricks 9.1 LTS ke atas, Anda dapat mengubah perilaku inferensi skema default di Avro dengan menyediakan opsi mergeSchema
saat membaca file. Pengaturan mergeSchema
untuk true
menyimpulkan skema dari satu set file Avro di direktori target dan menggabungkannya daripada menyimpulkan skema baca dari satu file.
Jenis yang didukung untuk konversi Avro -> Spark SQL
Perpustakaan ini mendukung membaca semua jenis Avro. Jenis ini menggunakan pemetaan berikut dari jenis Avro ke jenis Spark SQL:
Jenis Avro | Jenis SQL Spark |
---|---|
Boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
ganda | DoubleType |
byte | BinaryType |
string | StringType |
record | StructType |
enum | StringType |
array | ArrayType |
peta | MapType |
Teratasi | BinaryType |
penyatuan | Lihat Jenis Union. |
Jenis serikat pekerja
Sumber data Avro mendukung jenis pembacaan union
. Avro menganggap tiga jenis berikut ini sebagai jenis union
:
union(int, long)
mematakan keLongType
.union(float, double)
mematakan keDoubleType
.union(something, null)
, di manasomething
adalah jenis Avro yang didukung. Ini memetakan ke jenis Spark SQL yangsomething
sama dengannullable
yang diatur ketrue
.
Semua jenis union
lainnya adalah tipe yang kompleks. Jenis ini dipetakan ke StructType
di mana nama bidangnya member0
, member1
, dan seterusnya, sesuai dengan anggota union
. Ini konsisten dengan perilaku saat mengonversi antara Avro dan Parket.
Tipe logika
Sumber data Avro mendukung pembacaan jenis logis Avro berikut:
Jenis logis Avro | Jenis Avro | Jenis SQL Spark |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
desimal | Teratasi | DecimalType |
desimal | byte | DecimalType |
Catatan
Sumber data Avro mengabaikan dokumen, alias, dan properti lain yang ada dalam file Avro.
Jenis yang didukung untuk SQL Spark -> Konversi Avro
Perpustakaan ini mendukung penulisan semua jenis Spark SQL ke Avro. Untuk sebagian besar jenis, pemetaan dari jenis Spark ke jenis Avro mudah (misalnya IntegerType
dikonversi ke int
); berikut ini adalah daftar beberapa kasus khusus:
Jenis SQL Spark | Jenis Avro | Jenis logis Avro |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | byte | |
DecimalType | Teratasi | desimal |
TimestampType | long | timestamp-micros |
DateType | int | date |
Anda juga dapat menentukan seluruh skema Avro keluaran dengan opsi avroSchema
, sehingga jenis Spark SQL dapat dikonversi menjadi jenis Avro lainnya.
Konversi berikut tidak diterapkan secara default dan memerlukan skema Avro yang ditentukan pengguna:
Jenis SQL Spark | Jenis Avro | Jenis logis Avro |
---|---|---|
ByteType | Teratasi | |
StringType | enum | |
DecimalType | byte | desimal |
TimestampType | long | timestamp-millis |
Contoh
Contoh-contoh ini menggunakan file episodes.avro.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Contoh ini menunjukkan skema Avro kustom:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Contoh ini menunjukkan opsi kompresi Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Contoh ini menunjukkan catatan Avro yang dipartisi:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Contoh ini menunjukkan nama rekaman dan ruang nama:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Untuk mengkueri data Avro di SQL, daftarkan file data sebagai tabel atau tampilan sementara:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Contoh buku catatan: Membaca dan menulis file Avro
Buku catatan berikut menunjukkan cara membaca dan menulis file Avro.