File Avro

Apache Avro è un sistema di serializzazione dei dati. Avro fornisce:

  • Strutture di dati avanzate.
  • Formato di dati binario compatto, veloce e veloce.
  • Un file contenitore per archiviare i dati persistenti.
  • Chiamata di procedura remota (RPC).
  • Integrazione semplice con linguaggi dinamici. La generazione del codice non è necessaria per leggere o scrivere file di dati né per usare o implementare protocolli RPC. Generazione di codice come ottimizzazione facoltativa, vale la pena implementare solo per i linguaggi tipizzato in modo statico.

L'origine dati Avro supporta:

  • Conversione dello schema: conversione automatica tra i record APACHE Spark SQL e Avro.
  • Partizionamento: leggere e scrivere facilmente dati partizionati senza alcuna configurazione aggiuntiva.
  • Compressione: compressione da usare durante la scrittura di Avro su disco. I tipi supportati sono uncompressed, snappye deflate. È anche possibile specificare il livello di deflate.
  • Nomi dei record: nome e spazio dei nomi dei record passando una mappa di parametri con recordName e recordNamespace.

Vedere anche Leggere e scrivere dati Avro in streaming.

Impostazione

È possibile modificare il comportamento di un'origine dati Avro usando vari parametri di configurazione.

Per ignorare i file senza l'estensione durante la .avro lettura, è possibile impostare il parametro avro.mapred.ignore.inputs.without.extension nella configurazione di Hadoop. Il valore predefinito è false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Per configurare la compressione durante la scrittura, impostare le proprietà di Spark seguenti:

  • Codec di compressione: spark.sql.avro.compression.codec. I codec supportati sono snappy e deflate. Il codec predefinito è snappy.
  • Se il codec di compressione è deflate, è possibile impostare il livello di compressione con : spark.sql.avro.deflate.level. Il livello predefinito è -1.

È possibile impostare queste proprietà nella configurazione spark del cluster o in fase di esecuzione usando spark.conf.set(). Ad esempio:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Per Databricks Runtime 9.1 LTS e versioni successive, è possibile modificare il comportamento di inferenza dello schema predefinito in Avro fornendo l'opzione durante la mergeSchema lettura dei file. L'impostazione di mergeSchema su true dedurrà uno schema da un set di file Avro nella directory di destinazione e li unisce anziché dedurre lo schema di lettura da un singolo file.

Tipi supportati per Avro -> Conversione spark SQL

Questa libreria supporta la lettura di tutti i tipi Avro. Usa il mapping seguente dai tipi Avro ai tipi Spark SQL:

Tipo Avro Tipo SQL Spark
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
string StringType
registra StructType
enum StringType
array ArrayType
mappa MapType
fixed BinaryType
union Vedere Tipi di unione.

Tipi unione

L'origine dati Avro supporta i tipi di lettura union . Avro considera i tre tipi union seguenti:

  • union(int, long) esegue il mapping a LongType.
  • union(float, double) esegue il mapping a DoubleType.
  • union(something, null), dove something è qualsiasi tipo Avro supportato. Viene eseguito il mapping allo stesso tipo DI SPARK SQL di something, con nullable impostato su true.

Tutti gli altri union tipi sono tipi complessi. Eseguono il mapping a dove i nomi dei campi sono member0, member1e così via, in base ai StructType membri dell'oggetto union. Questo comportamento è coerente con il comportamento durante la conversione tra Avro e Parquet.

Tipi logici

L'origine dati Avro supporta la lettura dei tipi logici Avro seguenti:

Tipo logico Avro Tipo Avro Tipo SQL Spark
data int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimale fixed DecimalType
decimale bytes DecimalType

Nota

L'origine dati Avro ignora documenti, alias e altre proprietà presenti nel file Avro.

Tipi supportati per Spark SQL -> Conversione Avro

Questa libreria supporta la scrittura di tutti i tipi Spark SQL in Avro. Per la maggior parte dei tipi, il mapping dai tipi Spark ai tipi Avro è semplice (ad esempio IntegerType viene convertito in int); di seguito è riportato un elenco dei pochi casi speciali:

Tipo SQL Spark Tipo Avro Tipo logico Avro
ByteType int
ShortType int
BinaryType bytes
DecimalType fixed decimale
TimestampType long timestamp-micros
DateType int data

È anche possibile specificare l'intero schema Avro di output con l'opzione avroSchema, in modo che i tipi SPARK SQL possano essere convertiti in altri tipi Avro. Le conversioni seguenti non vengono applicate per impostazione predefinita e richiedono lo schema Avro specificato dall'utente:

Tipo SQL Spark Tipo Avro Tipo logico Avro
ByteType fixed
StringType enum
DecimalType bytes decimale
TimestampType long timestamp-millis

Esempi

Questi esempi usano il file episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Questo esempio illustra uno schema Avro personalizzato:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Questo esempio illustra le opzioni di compressione Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Questo esempio illustra i record Avro partizionati:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Questo esempio illustra il nome del record e lo spazio dei nomi:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Per eseguire query sui dati avro in SQL, registrare il file di dati come tabella o vista temporanea:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Esempio di notebook: Leggere e scrivere file Avro

Il notebook seguente illustra come leggere e scrivere file Avro.

Leggere e scrivere il notebook dei file Avro

Ottenere il notebook