Archivo Avro

Apache Avro es un sistema de serialización de datos. Avro proporciona:

  • Estructuras de datos enriquecidas.
  • Un formato de datos compacto, rápido y binario.
  • Un archivo contenedor para almacenar datos persistentes.
  • Llamada a procedimiento remoto (RPC).
  • Integración sencilla con lenguajes dinámicos. No se necesita generación de código para leer o escribir archivos de datos ni para usar o implementar protocolos RPC. La generación de código como una optimización opcional solo merece la pena implementarse para lenguajes con establecimiento estático de tipos.

El origen de datos Avro admite:

  • Conversión de esquemas: conversión automática entre Apache Spark SQL y registros Avro.
  • Creación de particiones: lectura y escritura sencillas de datos con particiones sin ninguna configuración adicional.
  • Compresión: compresión que se usa al escribir Avro en el disco. Los tipos admitidos son uncompressed, snappy y deflate. También puede especificar el nivel deflate.
  • Nombres de registro: nombre de registro y espacio de nombres al pasar un mapa de parámetros con recordName y recordNamespace.

Vea también Lectura y escritura de datos de Avro en streaming.

Configuración

Puede cambiar el comportamiento de un origen de datos de Avro mediante varios parámetros de configuración.

Para pasar por alto los archivos sin la extensión .avro al leer, puede establecer el parámetro avro.mapred.ignore.inputs.without.extension en la configuración de Hadoop. El valor predeterminado es false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar la compresión al escribir, establezca las siguientes propiedades de Spark:

  • Códec de compresión: spark.sql.avro.compression.codec. Los códecs admitidos son snappy y deflate. El códec predeterminado es snappy.
  • Si el códec de compresión es deflate, puede establecer el nivel de compresión con: spark.sql.avro.deflate.level. El nivel predeterminado es -1.

Puede establecer estas propiedades en la configuración de Spark del clúster o en tiempo de ejecución mediante spark.conf.set(). Por ejemplo:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

En Databricks Runtime 9.1 LTS y posteriores, puede cambiar el comportamiento de inferencia de esquemas predeterminado de Avro si proporciona la opción mergeSchema al leer archivos. Al establecer mergeSchema en true, se infiere un esquema de un conjunto de archivos Avro del directorio de destino y se combinan en lugar de inferirse el esquema de lectura de un único archivo.

Tipos admitidos para Avro -> conversión de Spark SQL

Esta biblioteca admite la lectura de todos los tipos Avro. Usa la siguiente asignación de tipos Avro a tipos Spark SQL:

Tipo Avro Tipo Spark SQL
boolean BooleanType
int IntegerType
long LongType
FLOAT FloatType
double DoubleType
bytes BinaryType
string StringType
registro StructType
enum StringType
array ArrayType
map MapType
corregido BinaryType
union Vea Tipos de unión.

Tipos de unión

El origen de datos Avro admite tipos union de lectura. Avro considera que los tres tipos siguientes son tipos union:

  • union(int, long) se asigna a LongType.
  • union(float, double) se asigna a DoubleType.
  • union(something, null), donde something es cualquier tipo Avro compatible. Esto se asigna al mismo tipo Spark SQL que el de something, con nullable establecido en true.

Todos los demás tipos union son tipos complejos. Se asignan a StructType, donde los nombres de campo son member0, member1, y así sucesivamente, de acuerdo con los miembros de union. Esto es coherente con el comportamiento al convertir entre Avro y Parquet.

Tipos lógicos

El origen de datos Avro admite la lectura de los siguientes tipos lógicos de Avro:

Tipo lógico Avro Tipo Avro Tipo Spark SQL
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
Decimal corregido DecimalType
Decimal bytes DecimalType

Nota:

El origen de datos Avro omite los documentos, alias y otras propiedades presentes en el archivo Avro.

Tipos admitidos para la conversión de Spark SQL -> Avro

Esta biblioteca admite la escritura de todos los tipos Spark SQL en Avro. Para la mayoría de los tipos, la asignación de tipos de Spark a tipos Avro es sencilla (por ejemplo, IntegerType se convierte en int); la siguiente es una lista de los pocos casos especiales:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
ByteType int
ShortType int
BinaryType bytes
DecimalType corregido Decimal
TimestampType long timestamp-micros
DateType int date

También puede especificar el esquema Avro de salida completo con la opción avroSchema, de modo que los tipos Spark SQL se puedan convertir en otros tipos Avro. Las conversiones siguientes no se aplican de manera predeterminada y requieren el esquema Avro especificado por el usuario:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
ByteType corregido
StringType enum
DecimalType bytes Decimal
TimestampType long timestamp-millis

Ejemplos

En estos ejemplos se usa el archivo episodes.avro.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

En este ejemplo se muestra un esquema de Avro personalizado:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

En este ejemplo se muestran las opciones de compresión de Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

En este ejemplo se muestran registros de Avro con particiones:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

En este ejemplo se muestran el nombre de registro y el espacio de nombres:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Para consultar datos de Avro en SQL, registre el archivo de datos como una tabla o una vista temporal:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Ejemplo de cuaderno: lectura y escritura de archivos Avro

En el cuaderno siguiente se muestra cómo leer y escribir archivos Avro.

Lectura y escritura en el cuaderno de archivos Avro

Obtener el cuaderno