Compartir a través de


Archivo Avro

Apache Avro es un sistema de serialización de datos. Avro proporciona:

  • Estructuras de datos enriquecidas.
  • Un formato de datos compacto, rápido y binario.
  • Un archivo contenedor para almacenar datos persistentes.
  • Llamada a procedimiento remoto (RPC).
  • Integración sencilla con lenguajes dinámicos. No se necesita generación de código para leer o escribir archivos de datos ni para usar o implementar protocolos RPC. La generación de código como una optimización opcional solo merece la pena implementarse para lenguajes con establecimiento estático de tipos.

El origen de datos de Avro admite:

  • Conversión de esquemas: conversión automática entre Apache Spark SQL y registros Avro.
  • Creación de particiones: lectura y escritura sencillas de datos con particiones sin ninguna configuración adicional.
  • Compresión: compresión que se usa al escribir Avro en el disco. Los tipos admitidos son uncompressed, snappy y deflate. También puede especificar el nivel deflate.
  • Nombres de registro: nombre de registro y espacio de nombres al pasar un mapa de parámetros con recordName y recordNamespace.

Vea también Lectura y escritura de datos de Avro en streaming.

Configuración

Puede cambiar el comportamiento de un origen de datos de Avro mediante varios parámetros de configuración.

Para pasar por alto los archivos sin la extensión .avro al leer, puede establecer el parámetro avro.mapred.ignore.inputs.without.extension en la configuración de Hadoop. El valor predeterminado es false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar la compresión al escribir, establezca las siguientes propiedades de Spark:

  • Códec de compresión: spark.sql.avro.compression.codec. Los códecs admitidos son snappy y deflate. El códec predeterminado es snappy.
  • Si el códec de compresión es deflate, puede establecer el nivel de compresión con: spark.sql.avro.deflate.level. El nivel predeterminado es -1.

Puede establecer estas propiedades en la configuración de Spark del clúster o en tiempo de ejecución mediante spark.conf.set(). Por ejemplo:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Para Databricks Runtime 9.1 LTS y versiones posteriores, puede cambiar el comportamiento de inferencia de esquema predeterminado en Avro proporcionando la mergeSchema opción al leer archivos. Al establecer mergeSchema en true, se infiere un esquema de un conjunto de archivos Avro del directorio de destino y se combinan en lugar de inferirse el esquema de lectura de un único archivo.

Tipos admitidos para Avro -> conversión de Spark SQL

Esta biblioteca admite la lectura de todos los tipos Avro. Usa la siguiente asignación de tipos Avro a tipos Spark SQL:

Tipo Avro Tipo Spark SQL
booleano Tipo Booleano
Int IntegerType
largo LongType
FLOAT Tipo de flotación
doble DoubleType
Bytes TipoBinario
cuerda / cadena tipo de cadena
registro StructType
enumeración tipo de cadena
arreglo TipoDeArreglo
mapa Tipo de Mapa
corregido TipoBinario
unión Consulte Tipos de unión.

Tipos de unión

El origen de datos Avro admite tipos union de lectura. Avro considera que los tres tipos siguientes son tipos union:

  • union(int, long) se asigna a LongType.
  • union(float, double) se asigna a DoubleType.
  • union(something, null), donde something es cualquier tipo Avro compatible. Esto se asigna al mismo tipo Spark SQL que el de something, con nullable establecido en true.

Todos los demás tipos union son tipos complejos. Se asignan a StructType, donde los nombres de campo son member0, member1, y así sucesivamente, de acuerdo con los miembros de union. Esto es coherente con el comportamiento al convertir entre Avro y Parquet.

Tipos lógicos

El origen de datos avro admite la lectura de los siguientes tipos lógicos de Avro:

Tipo lógico Avro Tipo Avro Tipo Spark SQL
fecha Int TipoFecha
marca de tiempo-millis largo Tipo de marca de tiempo
micros de marca de tiempo largo Tipo de marca de tiempo
Decimal corregido DecimalType
Decimal Bytes DecimalType

Nota:

El origen de datos Avro omite los documentos, alias y otras propiedades presentes en el archivo Avro.

Tipos admitidos para la conversión de Spark SQL -> Avro

Esta biblioteca admite la escritura de todos los tipos Spark SQL en Avro. Para la mayoría de los tipos, la asignación de tipos de Spark a tipos Avro es sencilla (por ejemplo, IntegerType se convierte en int); la siguiente es una lista de los pocos casos especiales:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
Tipo de Byte Int
ShortType Int
TipoBinario Bytes
DecimalType corregido Decimal
Tipo de marca de tiempo largo micros de marca de tiempo
TipoFecha Int fecha

También puede especificar el esquema Avro de salida completo con la opción avroSchema, de modo que los tipos Spark SQL se puedan convertir en otros tipos Avro. Las conversiones siguientes no se aplican de manera predeterminada y requieren el esquema Avro especificado por el usuario:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
Tipo de Byte corregido
tipo de cadena enumeración
DecimalType Bytes Decimal
Tipo de marca de tiempo largo marca de tiempo-millis

Ejemplos

Estos ejemplos usan el archivo episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

En este ejemplo se muestra un esquema de Avro personalizado:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

En este ejemplo se muestran las opciones de compresión de Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

En este ejemplo se muestran registros de Avro con particiones:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

En este ejemplo se muestran el nombre de registro y el espacio de nombres:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Pitón

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Para consultar datos de Avro en SQL, registre el archivo de datos como una tabla o una vista temporal:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Ejemplo de cuaderno: lectura y escritura de archivos Avro

En el cuaderno siguiente se muestra cómo leer y escribir archivos Avro.

Lectura y escritura en el cuaderno de archivos Avro

Obtener cuaderno