Compartilhar via


Arquivo Avro

O Apache Avro é um sistema de serialização de dados. O Avro fornece:

  • Estruturas de dados avançadas.
  • Um formato de dados binário, rápido e compacto.
  • Um arquivo de contêiner para armazenar dados persistentes.
  • Chamada de procedimento remoto (RPC).
  • Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos de RPC. Geração de código como uma otimização opcional vale a pena implementar apenas para linguagens de tipo estático.

A fonte de dados Avro dá suporte a:

  • Conversão de esquema: conversão automática entre registros do Apache Spark SQL e do Avro.
  • Particionamento: leitura e gravação fácil de dados particionados sem nenhuma configuração extra.
  • Compactação: a compactação a ser usada ao gravar o Avro no disco. Os tipos com suporte são uncompressed, snappy e deflate. Também é possível especificar o nível de esvaziamento (deflate).
  • Nomes de registro: registre o nome e o namespace passando um mapa de parâmetros com recordName e recordNamespace.

Veja também Ler e gravar dados de streaming do Avro.

Configuração

Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.

Para ignorar arquivos sem a extensão .avro durante a leitura, você pode definir o parâmetro avro.mapred.ignore.inputs.without.extension na configuração do Hadoop. O padrão é false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:

  • Codec de compactação: spark.sql.avro.compression.codec. Os codecs com suporte são snappy e deflate. O codec padrão é snappy.
  • Se o codec de compactação for deflate, você poderá definir o nível de compactação com: spark.sql.avro.deflate.level. O nível padrão é -1.

Você pode definir essas propriedades na configuração do Spark do cluster ou em runtime usando spark.conf.set(). Por exemplo:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Para o Databricks Runtime 9.1 LTS e superiores, você pode alterar o comportamento de inferência do esquema padrão no Avro fornecendo a opção mergeSchema ao ler arquivos. Definir mergeSchema como true inferirá um esquema de um conjunto de arquivos Avro no diretório de destino e os mesclará em vez de inferir o esquema de leitura de um único arquivo.

Tipos com suporte para conversão de Spark SQL -> Avro

Esta biblioteca dá suporte à leitura de todos os tipos Avro. Ela usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:

Tipos Avro Tipos Spark SQL
booleano Tipo Booleano
INT IntegerType
Longas LongType
derivar Tipo Flutuante
duplo DoubleType
Bytes Tipo Binário
cadeia Tipo de cadeia de caracteres
registro TipoDeEstrutura
enumeração Tipo de cadeia de caracteres
matriz Tipo de Array
mapa TipoDeMapa
corrigido Tipo Binário
união Confira Tipos de união.

Tipos de união

A fonte de dados Avro dá suporte à leitura de tipos union. O Avro considera os três tipos a seguir como tipos union:

  • O union(int, long) é mapeado para LongType.
  • O union(float, double) é mapeado para DoubleType.
  • O union(something, null), onde something é qualquer tipo Avro com suporte. Isso é mapeado para o mesmo tipo Spark SQL de something, com nullable definido como true.

Todos os outros tipos union são tipos complexos. Eles são mapeados para StructType onde os nomes de campo são member0, member1 e assim por diante, de acordo com os membros do union. Isso é consistente com o comportamento ao converter entre Avro e Parquet.

Tipos lógicos

A fonte de dados Avro dá suporte à leitura dos seguintes tipos lógicos Avro:

Tipos lógicos Avro Tipos Avro Tipos Spark SQL
data INT Tipo de Data
carimbo de data/hora-millis Longas Tipo de Timestamp
carimbo de data/hora micros Longas Tipo de Timestamp
decimal corrigido Tipo Decimal
decimal Bytes Tipo Decimal

Observação

A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.

Tipos com suporte para conversão entre Spark SQL -> Avro

Esta biblioteca dá suporte à gravação de todos os tipos Spark SQL em Avro. Para a maioria dos tipos, o mapeamento de tipos Spark para tipos Avro é simples (por exemplo IntegerType é convertido em int). A seguir está uma lista dos alguns casos especiais:

Tipos Spark SQL Tipos Avro Tipos lógicos Avro
Tipo de Byte INT
Tipo curto INT
Tipo Binário Bytes
Tipo Decimal corrigido decimal
Tipo de Timestamp Longas carimbo de data/hora micros
Tipo de Data INT data

Você também pode especificar todo o esquema Avro de saída com a opção avroSchema, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro. As conversões a seguir não são aplicadas por padrão e exigem esquema Avro especificado pelo usuário:

Tipos Spark SQL Tipos Avro Tipos lógicos Avro
Tipo de Byte corrigido
Tipo de cadeia de caracteres enumeração
Tipo Decimal Bytes decimal
Tipo de Timestamp Longas carimbo de data/hora-millis

Exemplos

Esses exemplos usam o arquivo episodes.avro.

Scala (linguagem de programação)

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Este exemplo demonstra um esquema Avro personalizado:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Este exemplo demonstra as opções de compactação Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Este exemplo demonstra registros Avro particionados:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Este exemplo demonstra o nome do registro e o namespace:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Para consultar dados Avro no SQL, registre o arquivo de dados como uma tabela ou exibição temporária:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Exemplo de notebook: ler e gravar arquivos Avro

O notebook a seguir demonstra como ler e gravar arquivos Avro.

Ler e gravar o notebook de arquivos Avro

Obter notebook