Partilhar via


Configurar inferência e evolução do esquema no Carregador Automático

Você pode configurar o Auto Loader para detetar automaticamente o esquema de dados carregados, permitindo inicializar tabelas sem declarar explicitamente o esquema de dados e evoluir o esquema de tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de rastrear e aplicar manualmente as alterações de esquema ao longo do tempo.

O Auto Loader também pode "resgatar" dados que foram inesperados (por exemplo, de diferentes tipos de dados) em uma coluna de blob JSON, que você pode optar por acessar posteriormente usando as APIs de acesso a dados semiestruturados.

Os seguintes formatos são suportados para inferência e evolução de esquema:

File format Versões suportadas
JSON Todas as versões
CSV Todas as versões
XML Databricks Runtime 14.3 LTS e superior
Avro Databricks Runtime 10.4 LTS e superior
Parquet Databricks Runtime 11.3 LTS e superior
ORC Não suportado
Text Não aplicável (esquema fixo)
Binaryfile Não aplicável (esquema fixo)

Sintaxe para inferência e evolução de esquemas

A especificação de um diretório de destino para a opção cloudFiles.schemaLocation permite a inferência e a evolução do esquema. Pode optar por utilizar o mesmo diretório especificado para o checkpointLocation. Se você usar o Delta Live Tables, o Azure Databricks gerenciará o local do esquema e outras informações de ponto de verificação automaticamente.

Nota

Se você tiver mais de um local de dados de origem sendo carregado na tabela de destino, cada carga de trabalho de ingestão do Auto Loader exigirá um ponto de verificação de streaming separado.

O exemplo a seguir usa parquet para o cloudFiles.format. Use csv, avroou json para outras fontes de arquivo. Todas as outras configurações de leitura e gravação permanecem as mesmas para os comportamentos padrão para cada formato.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Como funciona a inferência de esquema do Auto Loader?

Para inferir o esquema ao ler os dados pela primeira vez, o Auto Loader obtém amostras dos primeiros arquivos de 50 GB ou 1000 descobertos, seja qual for o limite ultrapassado primeiro. O Auto Loader armazena as informações do esquema em um diretório _schemas configurado para acompanhar as alterações de esquema nos dados de entrada ao longo do cloudFiles.schemaLocation tempo.

Nota

Para alterar o tamanho do exemplo usado, você pode definir as configurações SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(byte string, por exemplo 10gb)

e

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(inteiro)

Por padrão, a inferência de esquema do Auto Loader procura evitar problemas de evolução do esquema devido a incompatibilidades de tipo. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Auto Loader infere todas as colunas como cadeias de caracteres (incluindo campos aninhados em arquivos JSON). Para formatos com esquema tipado (Parquet e Avro), o Auto Loader obtém amostras de um subconjunto de arquivos e mescla os esquemas de arquivos individuais. Esse comportamento é resumido na tabela a seguir:

File format Tipo de dados inferidos padrão
JSON String
CSV String
XML String
Avro Tipos codificados no esquema Avro
Parquet Tipos codificados no esquema Parquet

O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes JSON, CSV e XML com base em dados de exemplo. Para habilitar esse comportamento com o Auto Loader, defina a opção cloudFiles.inferColumnTypes como true.

Nota

Ao inferir o esquema para dados CSV, o Auto Loader assume que os arquivos contêm cabeçalhos. Se os seus ficheiros CSV não contiverem cabeçalhos, forneça a opção .option("header", "false"). Além disso, o Auto Loader mescla os esquemas de todos os arquivos no exemplo para criar um esquema global. Auto Loader pode então ler cada arquivo de acordo com seu cabeçalho e analisar o CSV corretamente.

Nota

Quando uma coluna tem diferentes tipos de dados em dois arquivos Parquet, o Auto Loader escolhe o tipo mais amplo. Você pode usar schemaHints para substituir essa escolha. Quando você especifica dicas de esquema, o Auto Loader não converte a coluna para o tipo especificado, mas diz ao leitor de Parquet para ler a coluna como o tipo especificado. No caso de uma incompatibilidade, a coluna é resgatada na coluna de dados resgatados.

Como funciona a evolução do esquema do Auto Loader?

O Auto Loader deteta a adição de novas colunas à medida que processa os seus dados. Quando o Auto Loader deteta uma nova coluna, o fluxo para com um UnknownFieldExceptionarquivo . Antes de o fluxo lançar esse erro, o Auto Loader executa a inferência de esquema no microlote de dados mais recente e atualiza o local do esquema com o esquema mais recente mesclando novas colunas no final do esquema. Os tipos de dados das colunas existentes permanecem inalterados.

O Databricks recomenda configurar fluxos do Auto Loader com fluxos de trabalho para reiniciar automaticamente após essas alterações de esquema.

Auto Loader suporta os seguintes modos para evolução de esquema, que você define na opção cloudFiles.schemaEvolutionMode:

Modo Comportamento ao ler nova coluna
addNewColumns (padrão) O fluxo falha. Novas colunas são adicionadas ao esquema. As colunas existentes não evoluem os tipos de dados.
rescue O esquema nunca é evoluído e o fluxo não falha devido a alterações de esquema. Todas as novas colunas são registradas na coluna de dados resgatados.
failOnNewColumns O fluxo falha. O fluxo não é reiniciado a menos que o esquema fornecido seja atualizado ou o arquivo de dados ofensivo seja removido.
none Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados, a menos que a rescuedDataColumn opção seja definida. O fluxo não falha devido a alterações de esquema.

Como funcionam as partições com o Carregador Automático?

O Auto Loader tenta inferir colunas de partição a partir da estrutura de diretórios subjacente dos dados se os dados estiverem dispostos no particionamento no estilo Hive. Por exemplo, o caminho base_path/event=click/date=2021-04-01/f0.json do arquivo resulta na inferência de e event como colunas de date partição. Se a estrutura de diretórios subjacente contiver partições do Hive conflitantes ou não contiver particionamento no estilo Hive, as colunas de partição serão ignoradas.

Os formatos de arquivo binário (binaryFile) e text arquivo têm esquemas de dados fixos, mas suportam inferência de coluna de partição. O Databricks recomenda a configuração cloudFiles.schemaLocation para esses formatos de arquivo. Isso evita possíveis erros ou perda de informações e impede a inferência de colunas de partições cada vez que um Auto Loader começa.

As colunas de partição não são consideradas para a evolução do esquema. Se você tinha uma estrutura de diretório inicial como base_path/event=click/date=2021-04-01/f0.json, e depois começar a receber novos arquivos como base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader ignora a coluna hora. Para capturar informações para novas colunas de partição, defina cloudFiles.partitionColumns como event,date,hour.

Nota

A opção cloudFiles.partitionColumns usa uma lista separada por vírgulas de nomes de colunas. Somente as colunas que existem como key=value pares na estrutura de diretórios são analisadas.

O que é a coluna de dados resgatados?

Quando o Auto Loader infere o esquema, uma coluna de dados resgatada é adicionada automaticamente ao seu esquema como _rescued_data. Você pode renomear a coluna ou incluí-la nos casos em que fornecer um esquema definindo a opção rescuedDataColumn.

A coluna de dados resgatados garante que as colunas que não correspondem ao esquema sejam resgatadas em vez de serem descartadas. A coluna de dados resgatados contém todos os dados que não são analisados pelos seguintes motivos:

  • A coluna está ausente do esquema.
  • Incompatibilidades de tipo.
  • Incompatibilidades de casos.

A coluna de dados resgatados contém um JSON contendo as colunas resgatadas e o caminho do arquivo de origem do registro.

Nota

Os analisadores JSON e CSV suportam três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando usado em conjunto com rescuedDataColumno , as incompatibilidades de tipo de dados não fazem com que os registros sejam descartados no DROPMALFORMED modo ou gerem um erro no FAILFAST modo. Somente registros corrompidos são descartados ou geram erros, como JSON ou CSV incompletos ou malformados. Se você usar badRecordsPath ao analisar JSON ou CSV, as incompatibilidades de tipo de dados não serão consideradas registros incorretos ao usar o rescuedDataColumn. Somente registros JSON ou CSV incompletos e malformados são armazenados no badRecordsPath.

Alterar o comportamento que diferencia maiúsculas de minúscul

A menos que a diferenciação de maiúsculas e minúsculas esteja habilitada, as colunas abc, Abce ABC são consideradas a mesma coluna para fins de inferência de esquema. O caso escolhido é arbitrário e depende dos dados amostrados. Você pode usar dicas de esquema para impor qual caso deve ser usado. Uma vez que uma seleção tenha sido feita e o esquema seja inferido, o Auto Loader não considera as variantes de invólucro que não foram selecionadas consistentes com o esquema.

Quando a coluna de dados resgatados está habilitada, os campos nomeados em um caso diferente do esquema são carregados na _rescued_data coluna. Altere esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Auto Loader lê dados de forma que não diferencia maiúsculas de minúsculas.

Substituir inferência de esquema por dicas de esquema

Você pode usar dicas de esquema para impor as informações de esquema que você conhece e espera em um esquema inferido. Quando você sabe que uma coluna é de um tipo de dados específico, ou se quiser escolher um tipo de dados mais geral (por exemplo, um double em vez de um integer), você pode fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando sintaxe de especificação de esquema SQL, como o seguinte:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Consulte a documentação sobre tipos de dados para obter a lista de tipos de dados suportados.

Se uma coluna não estiver presente no início do fluxo, você também poderá usar dicas de esquema para adicionar essa coluna ao esquema inferido.

Aqui está um exemplo de um esquema inferido para ver o comportamento com dicas de esquema.

Esquema inferido:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Especificando as seguintes dicas de esquema:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

obtém:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Nota

O suporte a dicas de esquema de matriz e mapa está disponível no Databricks Runtime 9.1 LTS e superior.

Aqui está um exemplo de um esquema inferido com tipos de dados complexos para ver o comportamento com dicas de esquema.

Esquema inferido:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Especificando as seguintes dicas de esquema:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

obtém:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Nota

As dicas de esquema são usadas somente se você não fornecer um esquema para o Auto Loader. Você pode usar dicas de esquema se cloudFiles.inferColumnTypes estiver habilitado ou desabilitado.