Avro-fil

Apache Avro är ett data serialiseringssystem. Avro tillhandahåller:

  • Omfattande datastrukturer.
  • Ett kompakt, snabbt, binärt dataformat.
  • En containerfil för att lagra beständiga data.
  • Fjärrproceduranrop (RPC).
  • Enkel integrering med dynamiska språk. Kodgenerering krävs inte för att läsa eller skriva datafiler eller för att använda eller implementera RPC-protokoll. Kodgenerering som en valfri optimering, bara värt att implementera för statiskt skrivna språk.

Avro-datakällan stöder:

  • Schemakonvertering: Automatisk konvertering mellan Apache Spark SQL- och Avro-poster.
  • Partitionering: Läs och skriv enkelt partitionerade data utan extra konfiguration.
  • Komprimering: Komprimering som ska användas när Avro skrivs ut till disk. De typer som stöds är uncompressed, snappyoch deflate. Du kan också ange deflate-nivån.
  • Postnamn: Postnamn och namnområde genom att skicka en karta över parametrar med recordName och recordNamespace.

Se även Läsa och skriva strömmande Avro-data.

Konfiguration

Du kan ändra beteendet för en Avro-datakälla med hjälp av olika konfigurationsparametrar.

Om du vill ignorera filer utan .avro tillägget när du läser kan du ange parametern avro.mapred.ignore.inputs.without.extension i Hadoop-konfigurationen. Standardvärdet är false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Konfigurera komprimering när du skriver genom att ange följande Spark-egenskaper:

  • Komprimeringskod: spark.sql.avro.compression.codec. Codecs som stöds är snappy och deflate. Standardkodcen är snappy.
  • Om komprimeringskodc är deflatekan du ange komprimeringsnivån med: spark.sql.avro.deflate.level. Standardnivån är -1.

Du kan ange dessa egenskaper i Spark-klustrets konfiguration eller vid körning med hjälp av spark.conf.set(). Till exempel:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

För Databricks Runtime 9.1 LTS och senare kan du ändra standardbeteendet för schemainferens i Avro genom att ange mergeSchema alternativet när du läser filer. true Om du anger mergeSchema till härleds ett schema från en uppsättning Avro-filer i målkatalogen och sammanfogar dem i stället för att härleda lässchemat från en enda fil.

Typer som stöds för Avro –> Spark SQL-konvertering

Det här biblioteket stöder läsning av alla Avro-typer. Den använder följande mappning från Avro-typer till Spark SQL-typer:

Avro-typ Spark SQL-typ
boolean BooleanType
heltal IntegerType
lång LongType
flyttal FloatType
dubbel DoubleType
byte BinaryType
sträng StringType
Post StructType
uppräkning StringType
matris ArrayType
map MapType
Fast BinaryType
union Se Union-typer.

Union-typer

Avro-datakällan stöder lästyper union . Avro anser att följande tre typer är union typer:

  • union(int, long) mappar till LongType.
  • union(float, double) mappar till DoubleType.
  • union(something, null), där something finns någon Avro-typ som stöds. Detta mappar till samma Spark SQL-typ som för something, med nullable inställt på true.

Alla andra union typer är komplexa typer. De mappar till StructType var fältnamnen är member0, member1och så vidare, i enlighet med medlemmarna unioni . Detta överensstämmer med beteendet vid konvertering mellan Avro och Parquet.

Logiska typer

Avro-datakällan stöder läsning av följande logiska Avro-typer:

Logisk avro-typ Avro-typ Spark SQL-typ
datum heltal DateType
timestamp-millis lång Tidsstämpeltyp
timestamp-micros lång Tidsstämpeltyp
decimal Fast Decimaltyp
decimal byte Decimaltyp

Kommentar

Avro-datakällan ignorerar dokument, alias och andra egenskaper som finns i Avro-filen.

Typer som stöds för Spark SQL –> Avro-konvertering

Det här biblioteket stöder skrivning av alla Spark SQL-typer till Avro. För de flesta typer är mappningen från Spark-typer till Avro-typer enkel (till exempel IntegerType konverteras till int); följande är en lista över de få specialfallen:

Spark SQL-typ Avro-typ Logisk avro-typ
ByteType heltal
ShortType heltal
BinaryType byte
Decimaltyp Fast decimal
Tidsstämpeltyp lång timestamp-micros
DateType heltal datum

Du kan också ange hela Avro-schemat med alternativet avroSchema, så att Spark SQL-typer kan konverteras till andra Avro-typer. Följande konverteringar tillämpas inte som standard och kräver användarens angivna Avro-schema:

Spark SQL-typ Avro-typ Logisk avro-typ
ByteType Fast
StringType uppräkning
Decimaltyp byte decimal
Tidsstämpeltyp lång timestamp-millis

Exempel

I de här exemplen används filen episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Det här exemplet visar ett anpassat Avro-schema:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Det här exemplet visar Alternativ för Avro-komprimering:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Det här exemplet visar partitionerade Avro-poster:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Det här exemplet visar postnamnet och namnområdet:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Om du vill köra frågor mot Avro-data i SQL registrerar du datafilen som en tabell eller temporär vy:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Notebook-exempel: Läsa och skriva Avro-filer

Följande notebook-fil visar hur du läser och skriver Avro-filer.

Läsa och skriva Anteckningsbok för Avro-filer

Hämta notebook-fil