Aracılığıyla paylaş


Avro dosyası

Apache Avro bir veri serileştirme sistemidir. Avro şu bilgileri sağlar:

  • Zengin veri yapıları.
  • Kompakt, hızlı, ikili veri biçimi.
  • Kalıcı verileri depolamak için bir kapsayıcı dosyası.
  • Uzaktan yordam çağrısı (RPC).
  • Dinamik dillerle basit tümleştirme. Kod oluşturma, veri dosyalarını okumak veya yazmak ya da RPC protokollerini kullanmak veya uygulamak için gerekli değildir. İsteğe bağlı bir iyileştirme olarak kod oluşturma, yalnızca statik olarak yazılan diller için uygulamaya değer.

Avro veri kaynağı aşağıdakileri destekler:

  • Şema dönüştürme: Apache Spark SQL ve Avro kayıtları arasında otomatik dönüştürme.
  • Bölümleme: Ek yapılandırma olmadan bölümlenmiş verileri kolayca okuma ve yazma.
  • Sıkıştırma: Diske Avro yazarken kullanılacak sıkıştırma. Desteklenen türler , uncompressedve snappy'lerdirdeflate. Ayrıca, kullanımdan kaldırma düzeyini de belirtebilirsiniz.
  • Kayıt adları: recordName ve recordNamespaceile parametrelerin bir haritasını kullanarak ad ve ad alanını belirtin.

Ayrıca bkz . Akış Avro verilerini okuma ve yazma.

Yapılandırma

Çeşitli yapılandırma parametrelerini kullanarak Avro veri kaynağının davranışını değiştirebilirsiniz.

Okurken .avro uzantısı olmayan dosyaları yoksaymak için, Hadoop yapılandırmasında avro.mapred.ignore.inputs.without.extension parametresini ayarlayabilirsiniz. Varsayılan değer: false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Yazarken sıkıştırmayı yapılandırmak için aşağıdaki Spark özelliklerini ayarlayın:

  • Sıkıştırma codec'i: spark.sql.avro.compression.codec. Desteklenen codec'ler ve snappy'tirdeflate. Varsayılan codec bileşenidir snappy.
  • Sıkıştırma codec'i deflateise, sıkıştırma düzeyini şu şekilde ayarlayabilirsiniz: spark.sql.avro.deflate.level. Varsayılan düzey: -1.

Bu özellikleri kümesinde Spark Yapılandırması veya çalışma zamanında spark.conf.set()kullanarak ayarlayabilirsiniz. Örneğin:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Databricks Runtime 9.1 LTS ve üzeri için, dosyaları okurken seçeneğini sağlayarak Avro'daki varsayılan şema çıkarımı davranışını değiştirebilirsiniz. mergeSchema true olarak ayarlandığında, hedef dizindeki bir dizi Avro dosyasından şema çıkarılır ve tek bir dosyadan okuma şeması çıkarmak yerine bu şemalar birleştirilir.

Avro için desteklenen türler -> Spark SQL dönüşümü

Bu kitaplık tüm Avro türlerinin okunmasını destekler. Avro türlerinden Spark SQL türlerine aşağıdaki eşlemeyi kullanır:

Avro türü Spark SQL türü
Boolean Boolean türü
Int TamsayıTipi
uzun UzunTip
kayan noktalı sayı YüzenTip
çift ÇiftTip
bayt İkiliTip
Dize DizeTürü
kayıt YapıTürü
numaralandırma DizeTürü
dizi DiziTürü
harita Harita Türü
sabit İkiliTip
birleşim Bkz. Birleşim türleri.

Birleşim türleri

Avro veri kaynağı okuma union türlerini destekler. Avro, aşağıdaki üç türü tür olarak union kabul eder:

  • union(int, long) LongTypeile eşler.
  • union(float, double) DoubleTypeile eşler.
  • union(something, null), burada something desteklenen herhangi bir Avro türüdür. Bu, somethingile aynı Spark SQL türüne eşler ve nullabletrueolarak ayarlanır.

Diğer union tüm türler karmaşık türlerdir. Alan adları StructType, member0vb. olduğunda, member1üyeleriyle uyumlu olarak union'a eşlenir. Bu, Avro ile Parquet arasında dönüştürme sırasındaki davranışla tutarlıdır.

Mantıksal türler

Avro veri kaynağı aşağıdaki Avro mantıksal türlerinin okunmasını destekler:

Avro mantıksal türü Avro türü Spark SQL türü
tarih Int TarihTürü
zaman damgası milis uzun Zaman Damgası Türü
zaman damgası mikroları uzun Zaman Damgası Türü
ondalık sabit Ondalık Türü
ondalık bayt Ondalık Türü

Not

Avro veri kaynağı, Avro dosyasında bulunan belgeleri, diğer adları ve diğer özellikleri yoksayar.

Spark SQL için desteklenen türler -> Avro dönüştürme

Bu kitaplık, tüm Spark SQL türlerinin Avro'ya yazılma özelliğini destekler. Çoğu tür için Spark türlerinden Avro türlerine eşleme basittir (örneğin IntegerTypeintdönüştürülür); aşağıdaki birkaç özel durum listesidir:

Spark SQL türü Avro türü Avro mantıksal türü
BaytTürü Int
KısaTip Int
İkiliTip bayt
Ondalık Türü sabit ondalık
Zaman Damgası Türü uzun zaman damgası mikroları
TarihTürü Int tarih

Spark SQL türlerinin diğer Avro türlerine dönüştürülebilmesi için çıkış Avro şemasının tamamını avroSchemaseçeneğiyle de belirtebilirsiniz. Aşağıdaki dönüştürmeler varsayılan olarak uygulanmaz ve kullanıcı tarafından belirtilen Avro şemasını gerektirir:

Spark SQL türü Avro türü Avro mantıksal türü
ByteType sabit
DizeTürü numaralandırma
Ondalık Türü bayt ondalık
Zaman Damgası Türü uzun zaman damgası milis

Örnekler

Bu örneklerde episodes.avro dosyası kullanılır.

Scala programlama dili

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Bu örnekte özel bir Avro şeması gösterilmektedir:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Bu örnekte Avro sıkıştırma seçenekleri gösterilmektedir:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Bu örnekte bölümlenmiş Avro kayıtları gösterilmektedir:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Bu örnekte kayıt adı ve ad alanı gösterilmektedir:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Piton

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

SQL'de Avro verilerini sorgulamak için veri dosyasını tablo veya geçici görünüm olarak kaydedin:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Not defteri örneği: Avro dosyalarını okuma ve yazma

Aşağıdaki not defteri Avro dosyalarının nasıl okunduğunu ve yazılıp yazılamını gösterir.

Avro dosyaları not defterini okuma ve yazma

Dizüstü bilgisayar al