Avro-Datei

Apache Avro ist ein Datenserialisierungssystem. Avro bietet:

  • Umfangreiche Datenstrukturen.
  • Ein kompaktes, schnelles binäres Datenformat.
  • Eine Containerdatei zum Speichern persistenter Daten.
  • Remoteprozeduraufruf (RPC).
  • Einfache Integration in dynamische Sprachen. Die Codegenerierung ist weder zum Lesen oder Schreiben von Datendateien noch zum Verwenden oder Implementieren von RPC-Protokollen erforderlich. Codegenerierung als optionale Optimierung, deren Implementierung nur für statisch typisierte Sprachen sinnvoll ist.

Die Avro-Datenquelle unterstützt Folgendes:

  • Schemakonvertierung: Automatische Konvertierung zwischen Apache Spark SQL- und Avro-Datensätzen.
  • Partitionierung: Einfaches Lesen und Schreiben partitionierter Daten ohne zusätzliche Konfiguration.
  • Komprimierung: Komprimierung, die beim Schreiben von Avro auf den Datenträger verwendet werden soll. Die unterstützten Typen sind uncompressed, snappy und deflate. Sie können auch die Deflate-Ebene angeben.
  • Datensatznamen: Datensatzname und Namespace durch Übergabe einer Zuordnung von Parametern mit recordName und recordNamespace.

Weitere Informationen finden Sie auch unter Lesen und Schreiben von Avro-Streamingdaten.

Konfiguration

Sie können das Verhalten einer Avro-Datenquelle mithilfe verschiedener Konfigurationsparameter ändern.

Um Dateien ohne die Erweiterung .avro beim Lesen zu ignorieren, können Sie den Parameter avro.mapred.ignore.inputs.without.extension in der Hadoop-Konfiguration festlegen. Der Standardwert lautet false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Legen Sie die folgenden Spark-Eigenschaften fest, um die Komprimierung beim Schreiben zu konfigurieren:

  • Komprimierungscodec: spark.sql.avro.compression.codec. Unterstützte Codecs sind snappy und deflate. Der Standardcodec ist snappy.
  • Wenn der Komprimierungscodec deflate ist, können Sie die Komprimierungsebene mit spark.sql.avro.deflate.level festlegen. Die Standardebene ist -1.

Sie können diese Eigenschaften in der Spark-Konfiguration des Clusters oder zur Laufzeit mit spark.conf.set() festlegen. Beispiele:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Für Databricks Runtime 9.1 LTS und höher können Sie das standardmäßige Schemarückschlussverhalten in Avro ändern, indem Sie die Option mergeSchema beim Lesen von Dateien bereitstellen. Wenn Sie mergeSchema auf true festlegen, wird ein Schema aus einem Satz von Avro-Dateien im Zielverzeichnis abgeleitet und zusammengeführt, anstatt das Leseschema aus einer einzelnen Datei abzuleiten.

Unterstützte Typen für Avro –> Spark-SQL-Konvertierung

Diese Bibliothek unterstützt das Lesen aller Avro-Typen. Dabei wird die folgende Zuordnung von Avro-Typen zu Spark-SQL-Typen verwendet:

Avro-Typ Spark SQL-Typ
boolean BooleanType
int IntegerType
lang LongType
float FloatType
double DoubleType
Byte BinaryType
Zeichenfolge StringType
record StructType
enum StringType
array ArrayType
map MapType
behoben BinaryType
union Siehe Union-Typen.

Union-Datentypen

Die Avro-Datenquelle unterstützt das Lesen von union-Typen. Avro betrachtet die folgenden drei Typen als union-Typen:

  • union(int, long) ist LongType zugeordnet.
  • union(float, double) ist DoubleType zugeordnet.
  • union(something, null), wobei something ein beliebiger unterstützter Avro-Typ ist. Dies wird demselben Spark SQL-Typ zugeordnet wie der von something, wobei nullable auf true festgelegt ist.

Alle anderen union-Typen sind komplexe Typen. Sie werden StructType zugeordnet, wobei die Feldnamen in Übereinstimmung mit den Mitgliedern von unionmember0, member1 usw. lauten. Dies entspricht dem Verhalten beim Konvertieren zwischen Avro und Parquet.

Logische Typen

Die Avro-Datenquelle unterstützt das Lesen der folgenden logischen Avro-Typen:

Logischer Avro-Typ Avro-Typ Spark SQL-Typ
date int DateType
timestamp-millis lang TimestampType
timestamp-micros lang TimestampType
Decimal behoben DecimalType
Decimal Byte DecimalType

Hinweis

Die Avro-Datenquelle ignoriert Dokumente, Aliase und andere Eigenschaften, die in der Avro-Datei vorhanden sind.

Unterstützte Typen für Spark SQL – Konvertierung von > Avro

Diese Bibliothek unterstützt das Schreiben aller Spark SQL-Typen in Avro. Bei den meisten Typen ist die Zuordnung von Spark-Typen zu Avro-Typen unkompliziert (z. B. wird IntegerType in int konvertiert). Im Folgenden finden Sie eine Liste der wenigen Sonderfälle:

Spark SQL-Typ Avro-Typ Logischer Avro-Typ
ByteType int
ShortType int
BinaryType Byte
DecimalType behoben Decimal
TimestampType lang timestamp-micros
DateType int date

Sie können auch das gesamte Avro-Ausgabeschema mit der Option avroSchema festlegen, damit Spark SQL-Typen in andere Avro-Typen konvertiert werden können. Die folgenden Konvertierungen werden standardmäßig nicht angewendet und erfordern ein vom Benutzer angegebenes Avro-Schema:

Spark SQL-Typ Avro-Typ Logischer Avro-Typ
ByteType behoben
StringType enum
DecimalType Byte Decimal
TimestampType lang timestamp-millis

Beispiele

In diesen Beispielen wird die Datei episodes.avro verwendet.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Dieses Beispiel veranschaulicht ein benutzerdefiniertes Avro-Schema:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Dieses Beispiel veranschaulicht Avro-Komprimierungsoptionen:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Dieses Beispiel veranschaulicht partitionierte Avro-Datensätze:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Dieses Beispiel veranschaulicht den Datensatznamen und den Namespace:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Um Avro-Daten in SQL abfragen zu können, registrieren Sie die Datendatei als Tabelle oder temporäre Ansicht:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Notebookbeispiel: Lesen und Schreiben von Avro-Dateien

Das folgende Notebook veranschaulicht das Lesen und Schreiben von Avro-Dateien.

Notebook zum Lesen und Schreiben von Avro-Dateien

Notebook abrufen