Configurar a inferência e a evolução de esquema no Carregador Automático

Você pode configurar o Carregador Automático para detectar automaticamente o esquema de dados carregados, permitindo inicializar tabelas sem declarar explicitamente o esquema de dados e evoluir o esquema de tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de controlar e aplicar manualmente as alterações de esquema ao longo do tempo.

O Carregador Automático também pode "resgatar" dados inesperados (por exemplo, de tipos de dados diferentes) em uma coluna de blob JSON, que você pode optar por acessar posteriormente usando as APIs de acesso a dados semiestruturados.

Há suporte para os seguintes para inferência e evolução do esquema:

Formato de arquivo Versões com suporte
JSON Todas as versões
CSV Todas as versões
XML Databricks Runtime 14.3 LTS e superior
Avro Databricks Runtime 10.4 LTS e superior
Parquet Databricks Runtime 11.3 LTS e superior
ORC Sem suporte
Text Não aplicável (esquema fixo)
Binaryfile Não aplicável (esquema fixo)

Sintaxe para inferência e evolução de esquema

A especificação de um diretório de destino para a opção cloudFiles.schemaLocation, habilita a inferência e a evolução do esquema. Você pode optar por usar o mesmo diretório especificado para checkpointLocation. Se você usar o Delta Live Tables, o Azure Databricks gerenciará o local do esquema e outras informações de ponto de verificação automaticamente.

Observação

Se você tiver mais de um local de dados de origem sendo carregado na tabela de destino, cada carga de trabalho de ingestão do Carregador Automático exigirá um ponto de verificação de streaming separado.

O exemplo a seguir usa parquet para o cloudFiles.format. Use csv, avro ou json para outras fontes de arquivos. Todas as outras configurações de leitura e gravação permanecem as mesmas para os comportamentos padrão de cada formato.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Como funciona a inferência de esquema do Carregador Automático?

Para inferir o esquema ao ler os dados pela primeira vez, o Carregador Automático amostra os primeiros 50 GB ou 1000 arquivos descobertos, o limite que for cruzado primeiro. O Carregador Automático armazena as informações de esquema em um diretório _schemas no cloudFiles.schemaLocation configurado para acompanhar as alterações de esquema nos dados de entrada ao longo do tempo.

Observação

Para alterar o tamanho do exemplo usado, você poderá definir as configurações do SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(cadeia de caracteres de byte, por exemplo, 10gb)

e

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(inteiro)

Por padrão, a inferência de esquema do Carregador Automático busca evitar problemas de evolução do esquema devido a incompatibilidades de tipo. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Carregador Automático infere todas as colunas como cadeias de caracteres, incluindo campos aninhados em arquivos JSON. Em formatos com esquema tipado (Parquet e Avro), o Carregador Automático amostra um subconjunto de arquivos e mescla os esquemas de arquivos individuais. Esse comportamento é resumido na tabela a seguir:

Formato de arquivo Tipo de dados inferido padrão
JSON String
CSV String
XML String
Avro Tipos codificados no esquema Avro
Parquet Tipos codificados no esquema Parquet

O DataFrameReader do Apache Spark usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes JSON, CSV e XML com base nos dados de exemplo. Para habilitar esse comportamento com o Carregador Automático, defina a opção cloudFiles.inferColumnTypes como true.

Observação

Ao inferir o esquema para dados CSV, o Carregador Automático considera que os arquivos contêm cabeçalhos. Se os arquivos CSV não contiverem cabeçalhos, forneça a opção .option("header", "false"). Além disso, o carregador automático mescla os esquemas de todos os arquivos no exemplo para inventar um esquema global. O carregador automático pode ler cada arquivo de acordo com seu cabeçalho e analisar o CSV corretamente.

Observação

Quando uma coluna tem tipos de dados diferentes em dois arquivos Parquet, o Carregador Automático escolhe o tipo mais largo. Você pode usar schemaHints para substituir essa opção. Quando você especifica dicas de esquema, o Carregador Automático não converte a coluna no tipo especificado, mas instrui o leitor de Parquet a ler a coluna como o tipo especificado. No caso de incompatibilidade, a coluna é resgatada na coluna de dados resgatada.

Como funciona a evolução do esquema do Carregador Automático?

O Carregador Automático detecta a adição de novas colunas à medida que processa seus dados. Quando o Carregador Automático detecta uma nova coluna, o fluxo é interrompido com um UnknownFieldException. Antes de o fluxo gerar esse erro, o Carregador Automático executa a inferência de esquema no microlote de dados mais recente e atualiza o local do esquema com o esquema mais recente, mesclando as novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados.

O Databricks recomenda configurar fluxos do Carregador Automático com fluxos de trabalho para reinício automático após essas alterações de esquema.

O Carregador Automático dá suporte aos seguintes modos para a evolução do esquema, que você definiu na opção cloudFiles.schemaEvolutionMode:

Modo Comportamento ao ler a nova coluna
addNewColumns (padrão) O fluxo falha. Novas colunas são adicionadas ao esquema. As colunas existentes não fazem os tipos de dados evoluírem.
rescue O esquema nunca é evoluído e o fluxo não falha devido a alterações no esquema. Todas as novas colunas são registradas na coluna de dados resgatados.
failOnNewColumns O fluxo falha. O fluxo não será reiniciado, a menos que o esquema fornecido seja atualizado ou o arquivo de dados ofensivo seja removido.
none Não evolui o esquema, novas colunas são ignoradas e os dados não são salvos, a menos que a opção rescuedDataColumn esteja definida. O fluxo não falha devido a alterações no esquema.

Como as partições funcionam com o Carregador Automático?

O Carregador Automático tentará inferir colunas de partição da estrutura de diretório subjacente dos dados se os dados forem colocados no particionamento de estilo do Hive. Por exemplo, o caminho de arquivo base_path/event=click/date=2021-04-01/f0.json resulta na inferência de date e event como colunas de partição. Se a estrutura de diretório subjacente contiver partições do Hive conflitantes ou não contiver o particionamento de estilo do Hive, as colunas de partição são ignoradas.

O arquivo binário (binaryFile) e os formatos de arquivo binário text têm esquemas de dados fixos, mas dão suporte à inferência de coluna de partição. O Databricks recomenda a configuração de cloudFiles.schemaLocation para esses formatos de arquivo. Isso evita possíveis erros ou perda de informações e impede a inferência de colunas de partições sempre que um Carregador Automático é iniciado.

As colunas de partição não são consideradas para a evolução do esquema. Se você tiver uma estrutura de diretório inicial como base_path/event=click/date=2021-04-01/f0.jsone, em seguida, começar a receber novos arquivos como base_path/event=click/date=2021-04-01/hour=01/f1.json, o Carregador Automático ignora a coluna de hora. Para capturar informações para novas colunas de partição, defina cloudFiles.partitionColumns como event,date,hour.

Observação

A opção cloudFiles.partitionColumns usa uma lista de nomes de colunas separados por vírgula. Somente colunas que existem como pares key=value em sua estrutura de diretório são analisadas.

Qual é a coluna de dados resgatada?

Quando o Carregador Automático infere o esquema, uma coluna de dados resgatados é adicionada automaticamente ao esquema como _rescued_data. Você pode renomear a coluna ou incluí-la nos casos em que fornecer um esquema definindo a opção rescuedDataColumn.

A coluna de dados resgatada garante que as colunas que não correspondem ao esquema sejam resgatadas em vez de serem descartadas. A coluna de dados resgatados contém quaisquer dados que não são analisados pelas seguintes razões:

  • A coluna foi ignorada no esquema.
  • Incompatibilidades de tipo.
  • Incompatibilidades de caso.

A coluna de dados resgatada contém um JSON com as colunas resgatadas e o caminho do arquivo de origem do registro.

Observação

Os analisadores JSON e CSV dão suporte a três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando usado junto a rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam removidos no modo DROPMALFORMED ou geram um erro no modo FAILFAST. Somente os registros corrompidos são descartados ou geram erros, como um JSON ou um CSV incompleto ou malformado. Se você usar badRecordsPath ao analisar JSON ou CSV, as incompatibilidades de tipo de dados não serão consideradas como registros inválidos ao usar o rescuedDataColumn. Somente registros JSON ou CSV incompletos e malformados são armazenados em badRecordsPath.

Alterar o comportamento de diferenciação de maiúsculas e minúsculas

A menos que a diferenciação de maiúsculas e minúsculas esteja habilitada, as colunas abc, Abc e ABC serão consideradas a mesma coluna para a finalidade da inferência do esquema. A escolha do uso de maiúsculas ou minúsculas é arbitrária e depende dos dados amostrados. Você pode usar dicas de esquema para impor qual caso deve ser usado. Depois que uma seleção é feita e o esquema é inferido, o Carregador Automático não considera as variantes entre maiúsculas e minúsculas que não foram selecionadas de maneira consistente com o esquema.

Quando a coluna de dados resgatada estiver habilitada, os campos nomeados com o uso de maiúsculas e minúsculas diferentes do esquema serão carregados na coluna _rescued_data. Altere esse comportamento definindo a opção readerCaseSensitive como false. Nesse caso, o Carregador Automático lerá os dados de maneira que não diferencie maiúsculas de minúsculas.

Substituir a inferência de esquema com dicas de esquema

Você pode usar dicas de esquema para impor as informações de esquema que você sabe e espera em um esquema inferido. Quando você sabe que uma coluna é de um tipo de dados específico ou se quiser escolher um tipo de dados mais geral (por exemplo, um double em vez de um integer), poderá fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando a sintaxe de especificação do esquema SQL, como o seguinte:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Confira a documentação sobre tipos de dados para ver a lista de tipos de dados com suporte.

Se uma coluna não estiver presente no início do fluxo, você também poderá usar dicas de esquema para adicionar essa coluna ao esquema inferido.

Aqui está um exemplo de um esquema inferido para ver o comportamento com dicas de esquema.

Esquema inferido:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Ao especificar as seguintes dicas de esquema:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

você recebe:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Observação

O suporte a dicas de esquema de Matriz e Mapa está disponível no Databricks Runtime 9.1 LTS e superior.

Aqui está um exemplo de um esquema inferido com tipos de dados complexos para ver o comportamento com dicas de esquema.

Esquema inferido:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Ao especificar as seguintes dicas de esquema:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

você recebe:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Observação

As dicas de esquema serão usadas somente se você não fornecer um esquema para o Carregador Automático. Você poderá usar dicas de esquema se cloudFiles.inferColumnTypes estiver habilitado ou desabilitado.