Partilhar via


Ficheiro Avro

Apache Avro é um sistema de serialização de dados. Avro fornece:

  • Estruturas de dados avançadas.
  • Um formato de dados compacto, rápido e binário.
  • Um arquivo contêiner, para armazenar dados persistentes.
  • Chamada de procedimento remoto (RPC).
  • Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, só vale a pena implementar para linguagens tipadas estaticamente.

A fonte de dados Avro suporta:

  • Conversão de esquema: Conversão automática entre registros Apache Spark SQL e Avro.
  • Particionamento: Facilmente ler e gravar dados particionados sem qualquer configuração extra.
  • Compressão: Método de compressão a usar ao gravar Avro no disco. Os tipos suportados são uncompressed, snappye deflate. Você também pode especificar o nível de deflação.
  • Registar nomes: Registe o nome e o namespace passando um mapa de parâmetros com recordName e recordNamespace.

Consulte também Ler e gravar dados de streaming Avro.

Configuração

Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.

Para ignorar ficheiros sem a extensão .avro durante a leitura, pode-se definir o parâmetro avro.mapred.ignore.inputs.without.extension na configuração do Hadoop. A predefinição é false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:

  • Codec de compressão: spark.sql.avro.compression.codec. Os codecs suportados são snappy e deflate. O codec padrão é snappy.
  • Se o codec de compressão for deflate, você pode definir o nível de compactação com: spark.sql.avro.deflate.level. O nível padrão é -1.

Você pode definir essas propriedades na configuração do cluster Spark ou em tempo de execução usando spark.conf.set(). Por exemplo:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Para o Databricks Runtime 9.1 LTS e superiores, pode-se alterar o comportamento de inferência de esquema padrão no Avro ao fornecer a opção mergeSchema ao ler ficheiros. Definir mergeSchema como true irá inferir um esquema de um conjunto de arquivos Avro no diretório de destino e mesclá-los em vez de inferir o esquema de leitura a partir de um único arquivo.

Tipos suportados para a conversão de Avro em Spark SQL

Esta biblioteca suporta a leitura de todos os tipos Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:

Tipo Avro Tipo Spark SQL
Booleano Tipo Booleano
número inteiro Tipo inteiro
longo Tipo Longo
flutuante Tipo de flutuação
duplo Tipo Duplo
Bytes TipoBinário
corda TipoString
gravação Tipo de estrutura
enum TipoString
matriz Tipo de matriz
mapa Tipo de mapa
fixo TipoBinário
união Consulte Tipos de União.

Tipos de União

A fonte de dados Avro suporta a leitura de tipos union. A Avro considera os seguintes três tipos como tipos union :

  • union(int, long) corresponde a LongType.
  • union(float, double) corresponde a DoubleType.
  • union(something, null), onde something é qualquer tipo Avro suportado. Isso mapeia para o mesmo tipo de SQL do Spark que o de something, com nullable configurado como true.

Todos os outros union tipos são tipos complexos. Eles são mapeados para StructType, onde os nomes de campo são member0, member1, e assim por diante, em conformidade com os membros do union. Isso é consistente com o comportamento ao converter entre Avro e Parquet.

Tipos lógicos

A fonte de dados Avro suporta a leitura dos seguintes tipos lógicos Avro:

Tipo lógico Avro Tipo Avro Tipo Spark SQL
data número inteiro Tipo de Data
timestamp-millis longo Tipo de data/hora
carimbo de tempo em microssegundos longo Tipo de data/hora
decimais fixo Tipo decimal
decimais Bytes Tipo decimal

Nota

A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.

Tipos suportados para Spark SQL -> Conversão Avro

Esta biblioteca suporta a escrita de todos os tipos de Spark SQL no Avro. Para a maioria dos tipos, o mapeamento de tipos Spark para tipos Avro é simples (por exemplo IntegerType , é convertido em int); a seguir está uma lista dos poucos casos especiais:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
Tipo de Byte número inteiro
Tipo abreviado número inteiro
TipoBinário Bytes
Tipo decimal fixo decimais
Tipo de data/hora longo carimbo de tempo em microssegundos
Tipo de Data número inteiro data

Você também pode especificar todo o esquema Avro de saída com a opção avroSchema, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro. As conversões a seguir não são aplicadas por padrão e exigem o esquema Avro especificado pelo usuário:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
Tipo de Byte fixo
TipoString enum
Tipo decimal Bytes decimais
Tipo de data/hora longo marca de tempo em milissegundos

Exemplos

Esses exemplos usam o arquivo episodes.avro .

linguagem de programação Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Este exemplo demonstra um esquema Avro personalizado:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Este exemplo demonstra as opções de compactação Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Este exemplo demonstra registros Avro particionados:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Este exemplo demonstra o nome do registro e o namespace:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Píton

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Para consultar dados Avro em SQL, registre o arquivo de dados como uma tabela ou exibição temporária:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Exemplo de bloco de notas: ler e escrever ficheiros Avro

O bloco de anotações a seguir demonstra como ler e gravar arquivos Avro.

Ler e escrever ficheiros Avro em cadernos

Obter caderno