Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Evoluir e inferir o esquema usando
Importante
Esse recurso está em Visualização Pública.
Este artigo descreve como inferir e evoluir o esquema de objetos JSON com a função SQL from_json em Pipelines Declarativas do Lakeflow Spark.
Visão geral
A from_json função SQL analisa uma coluna de cadeia de caracteres JSON e retorna um valor struct. Quando usado fora de um pipeline, você deve fornecer explicitamente o esquema do valor retornado usando o schema argumento. Quando usado com pipelines declarativos do Lakeflow Spark, você pode habilitar a inferência e a evolução do esquema, que gerencia automaticamente o esquema do valor retornado. Esse recurso simplifica tanto a configuração inicial (especialmente quando o esquema é desconhecido) quanto as operações em andamento quando o esquema é alterado com frequência. Ele permite o processamento contínuo de blobs JSON arbitrários de fontes de dados de streaming, como Carregador Automático, Kafka ou Kinesis.
Especificamente, quando usado em um pipeline, a inferência e a evolução do esquema para a from_json função SQL podem:
- Detectar novos campos em registros JSON de entrada (incluindo objetos JSON aninhados)
- Inferir os tipos de campo e mapeá-los para tipos de dados apropriados do Spark.
- Evolua automaticamente o esquema para acomodar novos campos
- Manipular automaticamente dados que não estão em conformidade com o esquema atual
Sintaxe: inferir e evoluir automaticamente o esquema
Para habilitar a inferência de esquema no from_json pipeline, defina o esquema como NULL e especifique a opção schemaLocationKey. Isso permite inferir e controlar o esquema.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Uma consulta pode ter várias from_json expressões, mas cada expressão deve ter uma única schemaLocationKey. O schemaLocationKey também deve ser exclusivo por pipeline.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Sintaxe: esquema fixo
Se quiser impor um esquema específico, use a seguinte from_json sintaxe para analisar a cadeia de caracteres JSON usando esse esquema:
from_json(jsonStr, schema, [, options])
Essa sintaxe pode ser usada em qualquer ambiente do Azure Databricks, incluindo o Lakeflow Spark Declarative Pipelines. Mais informações estão disponíveis aqui.
Inferência de esquema
from_json infere o esquema do primeiro lote de colunas de dados JSON e o indexa internamente por seu schemaLocationKey (obrigatório).
Se a cadeia de caracteres JSON for um único objeto (por exemplo, {"id": 123, "name": "John"}), from_json inferirá um esquema do tipo STRUCT e adicionará um rescuedDataColumn à lista de campos.
STRUCT<id LONG, name STRING, _rescued_data STRING>
No entanto, se a cadeia de caracteres JSON tiver uma matriz de nível superior (como ["id": 123, "name": "John"]), em seguida, from_json encapsula a MATRIZ em um STRUCT. Essa abordagem permite o resgate de dados incompatíveis com o esquema inferido. Você tem a opção de explodir os valores da matriz em linhas separadas downstream.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Substituir a inferência de esquema usando instruções de esquema
Opcionalmente, você pode fornecer schemaHints para influenciar como from_json infere o tipo de uma coluna. Isso é útil quando você sabe que uma coluna é de um tipo de dados específico ou se você deseja escolher um tipo de dados mais geral (por exemplo, um duplo em vez de um inteiro). Você pode fornecer um número arbitrário de dicas para tipos de dados de coluna usando a sintaxe de especificação do esquema SQL. A semântica das dicas de esquema é a mesma das dicas de esquema do Carregador Automático. Por exemplo:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Quando a cadeia de caracteres JSON contém uma MATRIZ de nível superior, ela é encapsulada em um STRUCT. Nesses casos, as dicas de esquema são aplicadas ao esquema ARRAY em vez do STRUCT encapsulado. Por exemplo, considere uma cadeia de caracteres JSON com uma matriz de nível superior, como:
[{"id": 123, "name": "John"}]
O esquema ARRAY inferido é encapsulado em um STRUCT:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Para alterar o tipo de dados de id, especifique a dica de esquema como element.id STRING. Para adicionar uma nova coluna do tipo DOUBLE, especifique element.new_col DOUBLE. Devido a essas dicas, o esquema da matriz JSON de nível superior se torna:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Evoluir o esquema usando schemaEvolutionMode
from_json detecta a adição de novas colunas à medida que processa seus dados. Quando from_json detecta um novo campo, ele atualiza o esquema inferido com o esquema mais recente mesclando novas colunas ao final do esquema. Os tipos de dados de colunas existentes permanecem inalterados. Após a atualização do esquema, o pipeline é reiniciado automaticamente com o esquema atualizado.
from_json dá suporte aos modos a seguir para a evolução do esquema, que você define usando a configuração opcional schemaEvolutionMode . Esses modos são consistentes com o Carregador Automático.
schemaEvolutionMode |
Comportamento ao ler uma nova coluna |
|---|---|
addNewColumns (padrão) |
O fluxo falha. Novas colunas são adicionadas ao esquema. As colunas existentes não evoluem tipos de dados. |
rescue |
O esquema nunca é evoluído e o fluxo não falha devido a alterações de esquema. Todas as novas colunas são registradas na coluna de dados resgatada. |
failOnNewColumns |
O fluxo falha. O fluxo não é reiniciado, a menos que os schemaHints dados sejam atualizados ou que os dados ofensivos sejam removidos. |
none |
Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados, a menos que a opção rescuedDataColumn esteja definida. O streaming não falha devido a mudanças no esquema. |
Por exemplo:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Coluna de dados resgatados
Uma coluna de dados resgatada é adicionada automaticamente ao esquema como _rescued_data. Você pode renomear a coluna definindo a opção rescuedDataColumn . Por exemplo:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Quando você opta por usar a coluna de dados resgatada, todas as colunas que não correspondem ao esquema inferido são resgatadas em vez de descartadas. Isso pode acontecer devido a uma incompatibilidade de tipo de dados, uma coluna ausente no esquema ou uma diferença de maiúsculas e minúsculas no nome da coluna.
Gerenciar registros corrompidos
Para armazenar registros malformados e que não podem ser analisados, adicione uma _corrupt_record coluna definindo dicas de esquema, como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Para renomear a coluna de registro corrompida, defina a opção columnNameOfCorruptRecord .
O analisador JSON dá suporte a três modos para lidar com registros corrompidos:
| Mode | Description |
|---|---|
PERMISSIVE |
Para registros corrompidos, coloca a cadeia de caracteres malformada em um campo configurado por columnNameOfCorruptRecord e atribui aos campos malformados o valor null. Para preservar registros corrompidos, você pode definir um campo de tipo de cadeia de caracteres nomeado columnNameOfCorruptRecord em um esquema definido pelo usuário. Se um esquema não tiver o campo, os registros corrompidos serão descartados durante a análise. Ao inferir um esquema, o analisador adiciona implicitamente um columnNameOfCorruptRecord campo no esquema de saída. |
DROPMALFORMED |
Ignora registros corrompidos. Quando você usa o modo DROPMALFORMED com rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam descartados. Somente registros corrompidos são descartados, como JSON incompleto ou malformado. |
FAILFAST |
Gera uma exceção quando o analisador encontra registros corrompidos. Quando você usa o modo FAILFAST com rescuedDataColumn, incompatibilidades de tipo de dados não lançam um erro. Somente registros corrompidos lançam erros, como JSON incompleto ou malformado. |
Consulte um campo na saída from_json
from_json infere o esquema durante a execução do pipeline. Se uma consulta downstream se referir a um from_json campo antes de a from_json função ter sido executada com êxito pelo menos uma vez, o campo não será resolvido e a consulta será ignorada. No exemplo a seguir, a análise da consulta da camada prata será adiada até que a função from_json na consulta da camada bronze tenha sido executada e inferido o esquema.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Se a from_json função e os campos a que ela infere forem referenciados na mesma consulta, a análise poderá falhar como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Você pode corrigir isso movendo a referência para o campo from_json em uma consulta downstream (como o exemplo bronze/prata acima). Alternativamente, você pode especificar schemaHints que contêm os campos referenciados from_json. Por exemplo:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Exemplos: inferir e evoluir automaticamente o esquema
Esta seção fornece um código de exemplo para habilitar a inferência e a evolução do esquema automático usando from_json o Lakeflow Spark Declarative Pipelines.
Criar uma tabela de streaming do armazenamento de objetos na nuvem
O exemplo a seguir usa a sintaxe read_files para criar uma tabela de streaming a partir do armazenamento de objetos na nuvem.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Criar uma tabela de streaming no Kafka
O exemplo a seguir usa a sintaxe read_kafka para criar uma tabela em streaming do Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Exemplos: esquema fixo
Por exemplo, para código usando from_json com um esquema fixo, consulte a função from_json.
FAQs
Esta seção responde a perguntas frequentes sobre a inferência de esquema e o suporte à evolução na from_json função.
Qual é a diferença entre from_json e parse_json?
A parse_json função retorna um VARIANT valor da cadeia de caracteres JSON.
O VARIANT fornece uma maneira flexível e eficiente de armazenar dados semiestruturados. Isso contorna a inferência e a evolução do esquema ao acabar com tipos estritos completamente. No entanto, se você quiser impor um esquema em tempo de gravação (por exemplo, porque você tem um esquema relativamente estrito), from_json pode ser uma opção melhor.
A tabela a seguir descreve as diferenças entrefrom_json:parse_json
| Função | Casos de uso | Disponibilidade |
|---|---|---|
from_json |
A evolução do esquema com from_json preserva o esquema. Isso é útil quando:
|
Disponível com inferência de esquema & evolução somente em Pipelines Declarativos do Spark do Lakeflow |
parse_json |
VARIANT é particularmente adequado para armazenar dados que não precisam ser esquematizados. Por exemplo:
|
Disponível com e sem Pipelines Declarativos do Lakeflow Spark |
Posso usar from_json a sintaxe de inferência e evolução de esquema fora do Lakeflow Spark Declarative Pipelines?
Não, você não pode usar a sintaxe de inferência e evolução de esquemas from_json fora dos Pipelines Declarativos do Lakeflow Spark.
Como fazer para acessar o esquema inferido por from_json?
Exiba o esquema da tabela de streaming de destino.
Posso passar from_json um esquema e também fazer a evolução?
Não, você não pode passar from_json um esquema e também realizar evoluções. No entanto, você pode fornecer dicas de esquema para substituir alguns ou todos os campos inferidos por from_json.
O que acontece com o esquema se a tabela estiver totalmente atualizada?
Os locais de esquema associados à tabela são removidos e o esquema é inferido novamente desde o início.