Archivo Avro
Apache Avro es un sistema de serialización de datos. Avro proporciona:
- Estructuras de datos enriquecidas.
- Un formato de datos compacto, rápido y binario.
- Un archivo contenedor para almacenar datos persistentes.
- Llamada a procedimiento remoto (RPC).
- Integración sencilla con lenguajes dinámicos. No se necesita generación de código para leer o escribir archivos de datos ni para usar o implementar protocolos RPC. La generación de código como una optimización opcional solo merece la pena implementarse para lenguajes con establecimiento estático de tipos.
El origen de datos Avro admite:
- Conversión de esquemas: conversión automática entre Apache Spark SQL y registros Avro.
- Creación de particiones: lectura y escritura sencillas de datos con particiones sin ninguna configuración adicional.
- Compresión: compresión que se usa al escribir Avro en el disco. Los tipos admitidos son
uncompressed
,snappy
ydeflate
. También puede especificar el nivel deflate. - Nombres de registro: nombre de registro y espacio de nombres al pasar un mapa de parámetros con
recordName
yrecordNamespace
.
Vea también Lectura y escritura de datos de Avro en streaming.
Configuración
Puede cambiar el comportamiento de un origen de datos de Avro mediante varios parámetros de configuración.
Para pasar por alto los archivos sin la extensión .avro
al leer, puede establecer el parámetro avro.mapred.ignore.inputs.without.extension
en la configuración de Hadoop. El valor predeterminado es false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Para configurar la compresión al escribir, establezca las siguientes propiedades de Spark:
- Códec de compresión:
spark.sql.avro.compression.codec
. Los códecs admitidos sonsnappy
ydeflate
. El códec predeterminado essnappy
. - Si el códec de compresión es
deflate
, puede establecer el nivel de compresión con:spark.sql.avro.deflate.level
. El nivel predeterminado es-1
.
Puede establecer estas propiedades en la configuración de Spark del clúster o en tiempo de ejecución mediante spark.conf.set()
. Por ejemplo:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
En Databricks Runtime 9.1 LTS y posteriores, puede cambiar el comportamiento de inferencia de esquemas predeterminado de Avro si proporciona la opción mergeSchema
al leer archivos. Al establecer mergeSchema
en true
, se infiere un esquema de un conjunto de archivos Avro del directorio de destino y se combinan en lugar de inferirse el esquema de lectura de un único archivo.
Tipos admitidos para Avro -> conversión de Spark SQL
Esta biblioteca admite la lectura de todos los tipos Avro. Usa la siguiente asignación de tipos Avro a tipos Spark SQL:
Tipo Avro | Tipo Spark SQL |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
FLOAT | FloatType |
double | DoubleType |
bytes | BinaryType |
string | StringType |
registro | StructType |
enum | StringType |
array | ArrayType |
map | MapType |
corregido | BinaryType |
union | Vea Tipos de unión. |
Tipos de unión
El origen de datos Avro admite tipos union
de lectura. Avro considera que los tres tipos siguientes son tipos union
:
union(int, long)
se asigna aLongType
.union(float, double)
se asigna aDoubleType
.union(something, null)
, dondesomething
es cualquier tipo Avro compatible. Esto se asigna al mismo tipo Spark SQL que el desomething
, connullable
establecido entrue
.
Todos los demás tipos union
son tipos complejos. Se asignan a StructType
, donde los nombres de campo son member0
, member1
, y así sucesivamente, de acuerdo con los miembros de union
. Esto es coherente con el comportamiento al convertir entre Avro y Parquet.
Tipos lógicos
El origen de datos Avro admite la lectura de los siguientes tipos lógicos de Avro:
Tipo lógico Avro | Tipo Avro | Tipo Spark SQL |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
Decimal | corregido | DecimalType |
Decimal | bytes | DecimalType |
Nota:
El origen de datos Avro omite los documentos, alias y otras propiedades presentes en el archivo Avro.
Tipos admitidos para la conversión de Spark SQL -> Avro
Esta biblioteca admite la escritura de todos los tipos Spark SQL en Avro. Para la mayoría de los tipos, la asignación de tipos de Spark a tipos Avro es sencilla (por ejemplo, IntegerType
se convierte en int
); la siguiente es una lista de los pocos casos especiales:
Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DecimalType | corregido | Decimal |
TimestampType | long | timestamp-micros |
DateType | int | date |
También puede especificar el esquema Avro de salida completo con la opción avroSchema
, de modo que los tipos Spark SQL se puedan convertir en otros tipos Avro.
Las conversiones siguientes no se aplican de manera predeterminada y requieren el esquema Avro especificado por el usuario:
Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
---|---|---|
ByteType | corregido | |
StringType | enum | |
DecimalType | bytes | Decimal |
TimestampType | long | timestamp-millis |
Ejemplos
En estos ejemplos se usa el archivo episodes.avro.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
En este ejemplo se muestra un esquema de Avro personalizado:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
En este ejemplo se muestran las opciones de compresión de Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
En este ejemplo se muestran registros de Avro con particiones:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
En este ejemplo se muestran el nombre de registro y el espacio de nombres:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Para consultar datos de Avro en SQL, registre el archivo de datos como una tabla o una vista temporal:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Ejemplo de cuaderno: lectura y escritura de archivos Avro
En el cuaderno siguiente se muestra cómo leer y escribir archivos Avro.