Sdílet prostřednictvím


Soubor Avro

Apache Avro je systém serializace dat. Avro poskytuje:

  • Bohaté datové struktury.
  • Kompaktní, rychlý binární datový formát.
  • Soubor kontejneru pro ukládání trvalých dat.
  • Vzdálené volání procedur (RPC).
  • Jednoduchá integrace s dynamickými jazyky Generování kódu se nevyžaduje ke čtení nebo zápisu datových souborů ani k použití ani implementaci protokolů RPC. Generování kódu jako volitelná optimalizace stojí za to implementovat pouze pro staticky napsané jazyky.

Zdroj dat Avro podporuje:

  • Převod schématu: Automatický převod mezi záznamy Apache Spark SQL a Avro
  • Dělení: Snadné čtení a zápis dělených dat bez další konfigurace.
  • Komprese: Komprese, která se má použít při zápisu Avro na disk. Podporované typy jsou uncompressed, snappya deflate. Můžete také zadat úroveň deflate.
  • Názvy záznamů: Název záznamu a obor názvů předáním mapy parametrů s recordName a recordNamespace.

Viz také čtení a zápis streamovaných dat Avro.

Konfigurace

Chování zdroje dat Avro můžete změnit pomocí různých parametrů konfigurace.

Pokud chcete při čtení ignorovat soubory bez přípony .avro , můžete parametr nastavit avro.mapred.ignore.inputs.without.extension v konfiguraci Hadoopu. Výchozí hodnota je false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Pokud chcete při zápisu nakonfigurovat kompresi, nastavte následující vlastnosti Sparku:

  • Komprese kodeku: spark.sql.avro.compression.codec. Podporované kodeky jsou snappy a deflate. Výchozí kodek je snappy.
  • Pokud je deflatekodek komprese , můžete nastavit úroveň komprese pomocí: spark.sql.avro.deflate.level. Výchozí úroveň je -1.

Tyto vlastnosti můžete nastavit v konfiguraci Sparku clusteru nebo za běhu pomocí spark.conf.set(). Příklad:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Pro Databricks Runtime 9.1 LTS a vyšší můžete změnit výchozí chování odvozování schématu v Avro tím, že při čtení souborů poskytnete mergeSchema možnost. Nastavení mergeSchema pro true odvození schématu ze sady souborů Avro v cílovém adresáři a jejich sloučení místo odvození schématu čtení z jednoho souboru.

Podporované typy pro převod Avro –> Převod Spark SQL

Tato knihovna podporuje čtení všech typů Avro. Používá následující mapování z typů Avro na typy Spark SQL:

Typ Avro Typ Spark SQL
boolean BooleanType
int IntegerType
long LongType
float (číslo s plovoucí řádovou čárkou) FloatType
double DoubleType
bajtů BinaryType
string StringType
záznam Typ struktury
enum StringType
pole ArrayType
map MapType
stabilní BinaryType
sjednocení Viz typy Sjednocení.

Typy sjednocení

Zdroj dat Avro podporuje typy čtení union . Avro považuje následující tři typy za union typy:

  • union(int, long) mapuje na LongType.
  • union(float, double) mapuje na DoubleType.
  • union(something, null), kde something je jakýkoli podporovaný typ Avro. Tato možnost se mapuje na stejný typ Spark SQL jako typ sparku somethings nastaveným parametrem nullable true.

Všechny ostatní union typy jsou komplexní typy. Mapují se na StructType místo, kde jsou member0názvy polí , member1a tak dále, v souladu se členy union. To je konzistentní s chováním při převodu mezi Avro a Parquet.

Logické typy

Zdroj dat Avro podporuje čtení následujících logických typů Avro:

Logický typ Avro Typ Avro Typ Spark SQL
datum int Typ data
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal stabilní DecimalType
decimal bajtů DecimalType

Poznámka:

Zdroj dat Avro ignoruje dokumenty, aliasy a další vlastnosti, které jsou přítomné v souboru Avro.

Podporované typy pro převod Spark SQL –> Avro

Tato knihovna podporuje zápis všech typů Spark SQL do Avro. U většiny typů je mapování typů Sparku na typy Avro jednoduché (například IntegerType se převede na int); následuje seznam několika speciálních případů:

Typ Spark SQL Typ Avro Logický typ Avro
ByteType int
Krátký typ int
BinaryType bajtů
DecimalType stabilní decimal
TimestampType long timestamp-micros
Typ data int datum

Můžete také zadat celé výstupní schéma Avro s možností avroSchema, aby se typy Spark SQL mohly převést na jiné typy Avro. Následující převody se ve výchozím nastavení nepoužívají a vyžadují schéma Avro zadané uživatelem:

Typ Spark SQL Typ Avro Logický typ Avro
ByteType stabilní
StringType enum
DecimalType bajtů decimal
TimestampType long timestamp-millis

Příklady

Tyto příklady používají soubor epizods.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Tento příklad ukazuje vlastní schéma Avro:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Tento příklad ukazuje možnosti komprese Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Tento příklad ukazuje dělené záznamy Avro:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Tento příklad ukazuje název záznamu a obor názvů:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Pokud chcete dotazovat data Avro v SQL, zaregistrujte datový soubor jako tabulku nebo dočasné zobrazení:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Příklad poznámkového bloku: Čtení a zápis souborů Avro

Následující poznámkový blok ukazuje, jak číst a zapisovat soubory Avro.

Čtení a zápis souborů Avro

Získat poznámkový blok