Avro dosyası
Apache Avro bir veri serileştirme sistemidir. Avro şu bilgileri sağlar:
- Zengin veri yapıları.
- Kompakt, hızlı, ikili veri biçimi.
- Kalıcı verileri depolamak için bir kapsayıcı dosyası.
- Uzaktan yordam çağrısı (RPC).
- Dinamik dillerle basit tümleştirme. Kod oluşturma, veri dosyalarını okumak veya yazmak ya da RPC protokollerini kullanmak veya uygulamak için gerekli değildir. İsteğe bağlı bir iyileştirme olarak kod oluşturma, yalnızca statik olarak yazılan diller için uygulamaya değer.
Avro veri kaynağı aşağıdakileri destekler:
- Şema dönüştürme: Apache Spark SQL ve Avro kayıtları arasında otomatik dönüştürme.
- Bölümleme: Ek yapılandırma olmadan bölümlenmiş verileri kolayca okuma ve yazma.
- Sıkıştırma: Diske Avro yazarken kullanılacak sıkıştırma. Desteklenen türler ,
snappy
vedeflate
'lerdiruncompressed
. Ayrıca, kullanımdan kaldırma düzeyini de belirtebilirsiniz. - Kayıt adları: ve ile
recordName
parametre eşlemesi geçirerek adı verecordNamespace
ad alanını kaydedin.
Ayrıca bkz . Akış Avro verilerini okuma ve yazma.
Yapılandırma
Çeşitli yapılandırma parametrelerini kullanarak Avro veri kaynağının davranışını değiştirebilirsiniz.
Okurken uzantı olmadan .avro
dosyaları yoksaymak için Hadoop yapılandırmasında parametresini avro.mapred.ignore.inputs.without.extension
ayarlayabilirsiniz. Varsayılan değer: false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Yazarken sıkıştırmayı yapılandırmak için aşağıdaki Spark özelliklerini ayarlayın:
- Sıkıştırma codec'i:
spark.sql.avro.compression.codec
. Desteklenen codec'ler vedeflate
'tirsnappy
. Varsayılan codec bileşenidirsnappy
. - Sıkıştırma codec'i ise
deflate
, sıkıştırma düzeyini şu şekilde ayarlayabilirsiniz:spark.sql.avro.deflate.level
. Varsayılan düzey:-1
.
Bu özellikleri kullanarak küme Spark yapılandırmasında veya çalışma zamanında spark.conf.set()
ayarlayabilirsiniz. Örneğin:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Databricks Runtime 9.1 LTS ve üzeri için, dosyaları okurken seçeneğini sağlayarak mergeSchema
Avro'daki varsayılan şema çıkarımı davranışını değiştirebilirsiniz. ayarı mergeSchema
true
, hedef dizindeki bir Avro dosyaları kümesinden şema çıkartır ve tek bir dosyadan okuma şemasını çıkarsamak yerine bunları birleştirir.
Avro için desteklenen türler -> Spark SQL dönüşümü
Bu kitaplık tüm Avro türlerinin okunmasını destekler. Avro türlerinden Spark SQL türlerine aşağıdaki eşlemeyi kullanır:
Avro türü | Spark SQL türü |
---|---|
boolean | BooleanType |
int | IntegerType |
uzun | LongType |
kayan noktalı sayı | FloatType |
çift | DoubleType |
bayt | BinaryType |
Dize | StringType |
kayıt | StructType |
enum | StringType |
dizi | ArrayType |
map | MapType |
sabit | BinaryType |
birleşim | Bkz. Birleşim türleri. |
Birleşim türleri
Avro veri kaynağı okuma union
türlerini destekler. Avro, aşağıdaki üç türü tür olarak union
kabul eder:
union(int, long)
LongType
ile eşler.union(float, double)
DoubleType
ile eşler.union(something, null)
, buradasomething
desteklenen herhangi bir Avro türüdür. Bu, ile aynı Spark SQL türünesomething
eşlernullable
ve olarak ayarlanırtrue
.
Diğer union
tüm türler karmaşık türlerdir. Alan adlarının, üyelerine union
uygun olarak , member1
vb. olduğu yere eşlenirStructType
.member0
Bu, Avro ile Parquet arasında dönüştürme sırasındaki davranışla tutarlıdır.
Mantıksal türler
Avro veri kaynağı aşağıdaki Avro mantıksal türlerinin okunmasını destekler:
Avro mantıksal türü | Avro türü | Spark SQL türü |
---|---|---|
tarih | int | DateType |
zaman damgası milis | uzun | Zaman Damgası Türü |
zaman damgası mikroları | uzun | Zaman Damgası Türü |
ondalık | sabit | Ondalık Türü |
ondalık | bayt | Ondalık Türü |
Not
Avro veri kaynağı, Avro dosyasında bulunan belgeleri, diğer adları ve diğer özellikleri yoksayar.
Spark SQL için desteklenen türler -> Avro dönüştürme
Bu kitaplık, tüm Spark SQL türlerinin Avro'ya yazılma özelliğini destekler. Çoğu tür için Spark türlerinden Avro türlerine eşleme basittir (örneğin IntegerType
, öğesine dönüştürülür int
); aşağıdaki birkaç özel durum listesidir:
Spark SQL türü | Avro türü | Avro mantıksal türü |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bayt | |
Ondalık Türü | sabit | ondalık |
Zaman Damgası Türü | uzun | zaman damgası mikroları |
DateType | int | tarih |
Spark SQL türlerinin diğer Avro türlerine dönüştürülebilmesi için çıkış Avro şemasının tamamını seçeneğiyle avroSchema
de belirtebilirsiniz.
Aşağıdaki dönüştürmeler varsayılan olarak uygulanmaz ve kullanıcı tarafından belirtilen Avro şemasını gerektirir:
Spark SQL türü | Avro türü | Avro mantıksal türü |
---|---|---|
ByteType | sabit | |
StringType | enum | |
Ondalık Türü | bayt | ondalık |
Zaman Damgası Türü | uzun | zaman damgası milis |
Örnekler
Bu örneklerde episodes.avro dosyası kullanılır.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Bu örnekte özel bir Avro şeması gösterilmektedir:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Bu örnekte Avro sıkıştırma seçenekleri gösterilmektedir:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Bu örnekte bölümlenmiş Avro kayıtları gösterilmektedir:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Bu örnekte kayıt adı ve ad alanı gösterilmektedir:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
SQL'de Avro verilerini sorgulamak için veri dosyasını tablo veya geçici görünüm olarak kaydedin:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Not defteri örneği: Avro dosyalarını okuma ve yazma
Aşağıdaki not defteri Avro dosyalarının nasıl okunduğunu ve yazılıp yazılamını gösterir.