Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Apache Avro adalah sistem serialisasi data. Avro menyediakan:
- Struktur data yang kaya.
- Format data biner yang ringkas, cepat.
- File kontainer, untuk menyimpan data persisten.
- Panggilan Prosedur Jarak Jauh (Remote Procedure Call/RPC).
- Integrasi sederhana dengan bahasa dinamis. Pembuatan kode tidak diperlukan untuk membaca atau menulis file data atau menggunakan atau menerapkan protokol RPC. Pembuatan kode sebagai pengoptimalan opsional, hanya layak diterapkan untuk bahasa yang diketik secara statis.
Sumber data Avro mendukung:
- Konversi skema: Konversi otomatis antara rekaman Apache Spark SQL dan Avro.
- Partisi: Membaca dan menulis data yang dipartisi dengan mudah tanpa konfigurasi tambahan.
- Kompresi: Kompresi untuk digunakan saat menulis Avro ke disk. Jenis yang didukung adalah
uncompressed,snappy, dandeflate. Anda juga dapat menentukan tingkat kompresi. - Nama rekaman: Nama rekaman dan namespace dengan mengoper peta parameter dengan
recordNamedanrecordNamespace.
Lihat juga Membaca dan menulis data Avro streaming.
Konfigurasi
Anda dapat mengubah perilaku sumber data Avro menggunakan berbagai parameter konfigurasi.
Untuk mengabaikan file tanpa ekstensi .avro saat membaca, Anda dapat mengatur parameter avro.mapred.ignore.inputs.without.extension dalam konfigurasi Hadoop. Default adalah false.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Untuk mengonfigurasi pemadatan saat menulis, atur properti Spark berikut:
- Codec kompresi:
spark.sql.avro.compression.codec. Codec yang didukung adalahsnappydandeflate. Codec default adalahsnappy. - Jika codec kompresi
deflate, Anda dapat mengatur tingkat kompresi dengan:spark.sql.avro.deflate.level. Tingkat default adalah-1.
Anda dapat mengatur properti ini dalam konfigurasi Spark kluster atau saat runtime menggunakan spark.conf.set(). Contohnya:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Untuk Databricks Runtime 9.1 LTS ke atas, Anda dapat mengubah perilaku inferensi skema default di Avro dengan memberikan mergeSchema opsi saat membaca file. Mengatur mergeSchema ke true akan menyimpulkan skema dari sekumpulan file Avro di direktori target dan menggabungkannya daripada menyimpulkan skema baca dari satu file.
Jenis yang didukung untuk konversi Avro -> Spark SQL
Perpustakaan ini mendukung pembacaan semua tipe Avro. Jenis ini menggunakan pemetaan berikut dari jenis Avro ke jenis Spark SQL:
| Jenis Avro | Jenis SQL Spark |
|---|---|
| Boolean | BooleanType |
| int (integer) | Tipe Integer |
| panjang | Tipe Panjang |
| float | FloatType |
| ganda | DoubleType |
| Byte | BinaryType |
| string | StringType |
| rekaman | StructType |
| enum | StringType |
| array | ArrayType |
| memetakan | JenisPeta |
| Diperbaiki | BinaryType |
| penyatuan | Lihat Jenis union. |
Jenis serikat pekerja
Sumber data Avro mendukung pembacaan jenis union. Avro menganggap tiga jenis berikut ini sebagai jenis union:
-
union(int, long)mematakan keLongType. -
union(float, double)mematakan keDoubleType. -
union(something, null), di manasomethingadalah jenis Avro yang didukung. Ini memetakan ke jenis Spark SQL yang sama dengansomething, dengannullableditetapkan menjaditrue.
Semua jenis union lainnya adalah tipe yang kompleks. Pemetaan dilakukan ke StructType di mana nama bidang adalah member0, member1, dan seterusnya, sesuai dengan anggota union. Ini konsisten dengan perilaku saat mengonversi antara Avro dan Parquet.
Tipe logis
Sumber data Avro mendukung pembacaan jenis logika Avro berikut:
| Tipe logis Avro | Jenis Avro | Jenis SQL Spark |
|---|---|---|
| tanggal | int (integer) | TipeTanggal |
| cap waktu milidetik | panjang | Tipe Penanda Waktu |
| cap waktu mikrodetik | panjang | Tipe Penanda Waktu |
| desimal | Diperbaiki | TipeDesimal |
| desimal | Byte | TipeDesimal |
Catatan
Sumber data Avro mengabaikan dokumen, alias, dan properti lain yang ada dalam file Avro.
Jenis yang didukung untuk Spark SQL -> Pengonversian Avro
Perpustakaan ini mendukung penulisan semua jenis Spark SQL ke Avro. Untuk sebagian besar jenis, pemetaan dari jenis Spark ke jenis Avro sangat mudah (misalnya IntegerType dikonversi ke int); berikut ini adalah daftar beberapa kasus khusus:
| Jenis SQL Spark | Jenis Avro | Tipe logis Avro |
|---|---|---|
| ByteType | int (integer) | |
| ShortType | int (integer) | |
| BinaryType | Byte | |
| TipeDesimal | Diperbaiki | desimal |
| Tipe Penanda Waktu | panjang | cap waktu mikrodetik |
| TipeTanggal | int (integer) | tanggal |
Anda juga dapat menentukan seluruh skema Avro output dengan opsi avroSchema, sehingga jenis Spark SQL dapat dikonversi ke jenis Avro lainnya.
Konversi berikut tidak diterapkan secara default dan memerlukan skema Avro yang ditentukan pengguna:
| Jenis SQL Spark | Jenis Avro | Tipe logis Avro |
|---|---|---|
| ByteType | Diperbaiki | |
| StringType | enum | |
| TipeDesimal | Byte | desimal |
| Tipe Penanda Waktu | panjang | cap waktu milidetik |
Contoh
Contoh-contoh ini menggunakan file episodes.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Contoh ini menunjukkan skema Avro kustom:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Contoh ini menunjukkan opsi kompresi Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Contoh ini menunjukkan catatan Avro yang dipartisi:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Contoh ini menunjukkan nama rekaman dan ruang nama:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Phyton
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Untuk mengkueri data Avro di SQL, daftarkan file data sebagai tabel atau tampilan sementara:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Contoh buku catatan: Membaca dan menulis file Avro
Buku catatan berikut menunjukkan cara membaca dan menulis file Avro.