Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Dedurre ed evolvere lo schema usando
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Questo articolo descrive come dedurre ed evolvere lo schema dei BLOB JSON con la from_json funzione SQL in Pipeline dichiarative di Lakeflow Spark.
Informazioni generali
La from_json funzione SQL analizza una colonna stringa JSON e restituisce un valore di struct. Se usato all'esterno di una pipeline, è necessario specificare in modo esplicito lo schema del valore restituito usando l'argomento schema . Se usato con le pipeline dichiarative di Lakeflow Spark, è possibile abilitare l'inferenza dello schema e l'evoluzione, che gestisce automaticamente lo schema del valore restituito. Questa funzionalità semplifica sia l'installazione iniziale (soprattutto quando lo schema è sconosciuto) che le operazioni in corso quando lo schema cambia frequentemente. Consente l'elaborazione senza interruzioni di BLOB JSON arbitrari da origini dati di streaming, ad esempio Auto Loader, Kafka o Kinesis.
In particolare, quando viene utilizzato in una pipeline, l'inferenza e l'evoluzione dello schema per la funzione SQL from_json possono:
- Rilevare nuovi campi nei record JSON in ingresso (inclusi gli oggetti JSON annidati)
- Dedurre i tipi di campo ed eseguirne il mapping ai tipi di dati Spark appropriati
- Evolvere automaticamente lo schema per contenere nuovi campi
- Gestire automaticamente i dati non conformi allo schema corrente
Sintassi: dedurre ed evolvere automaticamente lo schema
Per abilitare l'inferenza dello schema con from_json in una pipeline, impostare lo schema su NULL e specificare l'opzione schemaLocationKey . In questo modo è possibile dedurre e tenere traccia dello schema.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Pitone
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Una query può avere più from_json espressioni, ma ogni espressione deve avere un oggetto univoco schemaLocationKey.
schemaLocationKey deve essere anche univoco per ogni pipeline.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Pitone
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Sintassi: schema fisso
Se invece si vuole applicare uno schema specifico, è possibile usare la sintassi seguente from_json per analizzare la stringa JSON usando tale schema:
from_json(jsonStr, schema, [, options])
Questa sintassi può essere usata in qualsiasi ambiente Azure Databricks, incluse le pipeline dichiarative di Lakeflow Spark. Altre informazioni sono disponibili qui.
Inferenza dello schema
from_json deduce lo schema dal primo batch di colonne di dati JSON e lo indicizza internamente in base al relativo schemaLocationKey (obbligatorio).
Se la stringa JSON è un singolo oggetto ( ad esempio , {"id": 123, "name": "John"}), from_json deduce uno schema di tipo STRUCT e aggiunge un rescuedDataColumn oggetto all'elenco di campi.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Tuttavia, se la stringa JSON ha una matrice di primo livello (ad esempio ["id": 123, "name": "John"]), from_json esegue il wrapping dell'array in uno STRUCT. Questo approccio consente di salvare i dati incompatibili con lo schema dedotto. È possibile esplodere i valori della matrice in righe downstream separate.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Oltrepassare l'inferenza dello schema utilizzando i suggerimenti dello schema
Facoltativamente, è possibile fornire schemaHints per influenzare la modalità from_json di deduzione del tipo di una colonna. Ciò è utile quando si sa che una colonna è di un tipo di dati specifico o se si desidera scegliere un tipo di dati più generale, ad esempio un valore double anziché un numero intero. È possibile fornire un numero arbitrario di hint per i tipi di dati delle colonne usando la sintassi della specifica dello schema SQL. La semantica per gli hint dello schema è identica a quella per gli hint dello schema del caricatore automatico. Per esempio:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Quando la stringa JSON contiene un ARRAY di primo livello, viene incapsulata in una STRUTTURA. In questi casi, gli hint dello schema vengono applicati allo schema ARRAY anziché allo STRUCT di cui è stato eseguito il wrapping. Si consideri, ad esempio, una stringa JSON con una matrice di primo livello, ad esempio:
[{"id": 123, "name": "John"}]
Lo schema ARRAY dedotto è racchiuso in un STRUCT.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Per modificare il tipo di dati di id, specificare l'hint dello schema come element.id STRING. Per aggiungere una nuova colonna di tipo DOUBLE, specificare element.new_col DOUBLE. A causa di questi hint, lo schema per la matrice JSON di primo livello diventa:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Evolvere lo schema usando schemaEvolutionMode
from_json rileva l'aggiunta di nuove colonne durante l'elaborazione dei dati. Quando from_json rileva un nuovo campo, aggiorna lo schema dedotto con lo schema più recente unendo nuove colonne alla fine dello schema. I tipi di dati delle colonne esistenti rimangono invariati. Dopo l'aggiornamento dello schema, la pipeline viene riavviata automaticamente con lo schema aggiornato.
from_json supporta le modalità seguenti per l'evoluzione dello schema, impostata usando l'impostazione facoltativa schemaEvolutionMode . Queste modalità sono coerenti con il caricatore automatico.
schemaEvolutionMode |
Comportamento durante la lettura di una nuova colonna |
|---|---|
addNewColumns (impostazione predefinita) |
Il flusso non è riuscito. Le nuove colonne vengono aggiunte allo schema. Le colonne esistenti non evolvono i tipi di dati. |
rescue |
Lo schema non è mai evoluto e il flusso non ha esito negativo a causa di modifiche dello schema. Tutte le nuove colonne vengono registrate nella colonna di dati salvata. |
failOnNewColumns |
Il flusso non è riuscito. Il flusso non viene riavviato a meno che il schemaHints non venga aggiornato o i dati problematici vengano rimossi. |
none |
Non evolve lo schema, le nuove colonne vengono ignorate e i dati non vengono salvati a meno che non sia impostata l'opzione rescuedDataColumn . Stream non fallisce a causa di cambiamenti dello schema. |
Per esempio:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Colonna di dati salvata
Una colonna di dati salvata viene aggiunta automaticamente allo schema come _rescued_data. È possibile rinominare la colonna impostando l'opzione rescuedDataColumn . Per esempio:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Quando si sceglie di usare la colonna di dati salvata, tutte le colonne che non corrispondono allo schema dedotto vengono salvate anziché eliminate. Ciò può verificarsi a causa di una mancata corrispondenza del tipo di dati, di una colonna mancante nello schema o di una differenza di maiuscole/minuscole nel nome di colonna.
Gestire i record danneggiati
Per archiviare i record difettosi che non possono essere analizzati, si aggiunge una colonna _corrupt_record impostando i suggerimenti di schema, come nell'esempio seguente:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Per rinominare la colonna di record danneggiata, impostare l'opzione columnNameOfCorruptRecord .
Il parser JSON supporta tre modalità per la gestione dei record danneggiati:
| Mode | Description |
|---|---|
PERMISSIVE |
Per i record danneggiati, inserisce la stringa in formato non valido in un campo configurato da columnNameOfCorruptRecord e imposta campi in formato non valido su null. Per mantenere i record danneggiati, è possibile impostare un campo di tipo stringa denominato columnNameOfCorruptRecord in uno schema definito dall'utente. Se il campo non è presente in uno schema, i record danneggiati vengono eliminati durante l'analisi. Quando si deduce uno schema, il parser aggiunge in modo implicito un columnNameOfCorruptRecord campo nello schema di output. |
DROPMALFORMED |
Ignora i record danneggiati. Quando si usa la DROPMALFORMED modalità con rescuedDataColumn, le mancate corrispondenze del tipo di dati non causano l'eliminazione dei record. Vengono eliminati solo i record danneggiati, ad esempio JSON incompleto o in formato non valido. |
FAILFAST |
Genera un'eccezione quando il parser incontra record danneggiati. Quando si utilizza la modalità FAILFAST con rescuedDataColumn, i mismatch di tipo di dati non generano un errore. Solo i record danneggiati generano errori, ad esempio JSON incompleto o in formato non valido. |
Fare riferimento a un campo nell'output from_json
from_json deduce lo schema durante l'esecuzione della pipeline. Se una query downstream fa riferimento a un from_json campo prima che la from_json funzione venga eseguita correttamente almeno una volta, il campo non viene risolto e la query viene ignorata. Nell'esempio seguente, l'analisi per la query della tabella Silver verrà ignorata fino a quando la funzione nella query della tabella Bronze non è stata eseguita e lo schema non è stato dedotto.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Se la from_json funzione e i campi inferi vengono indicati nella stessa query, l'analisi potrebbe non riuscire come nell'esempio seguente:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
È possibile risolvere questo problema spostando il riferimento al from_json campo in una query downstream ,ad esempio l'esempio bronze/silver precedente. In alternativa, è possibile specificare schemaHints che contengono i campi di riferimento from_json . Per esempio:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Esempi: inferire ed evolvere automaticamente lo schema
Questa sezione fornisce codice di esempio per abilitare l'inferenza e l'evoluzione automatica dello schema usando from_json in Pipeline dichiarative spark di Lakeflow.
Creare una tabella di streaming dall'archiviazione di oggetti cloud
L'esempio seguente usa la read_files sintassi per creare una tabella di streaming dall'archiviazione di oggetti cloud.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Pitone
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Creare una tabella di streaming da Kafka
Nell'esempio seguente si usa la sintassi read_kafka per creare una tabella di streaming da Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Pitone
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Esempi: schema fisso
Per un esempio di codice che usa from_json con uno schema fisso, vedere from_json funzione.
FAQs
Questa sezione risponde alle domande frequenti sull'inferenza dello schema e sul supporto dell'evoluzione nella from_json funzione.
Qual è la differenza tra from_json e parse_json?
La parse_json funzione restituisce un VARIANT valore dalla stringa JSON.
VARIANT offre un modo flessibile ed efficiente per archiviare dati semistrutturati. In questo modo si evita completamente l'inferenza e l'evoluzione dello schema eliminando del tutto i tipi rigorosi. Tuttavia, se si vuole applicare uno schema in fase di scrittura (ad esempio, perché si dispone di uno schema relativamente rigoroso), from_json potrebbe essere un'opzione migliore.
Nella tabella seguente vengono descritte le differenze tra from_json e parse_json:
| Funzione | Casi d'uso | Availability |
|---|---|---|
from_json |
L'evoluzione dello schema con from_json mantiene lo schema. Questo è utile quando:
|
Disponibile solo nelle pipeline dichiarative di Lakeflow Spark con inferenza ed evoluzione dello schema. |
parse_json |
VARIANT è particolarmente adatto per contenere dati che non devono essere schematizzati. Per esempio:
|
Disponibile con e senza pipeline dichiarative di Lakeflow Spark |
È possibile usare la from_json sintassi di inferenza ed evoluzione dello schema all'esterno delle pipeline dichiarative di Lakeflow Spark?
No, non è possibile usare from_json la sintassi di inferenza ed evoluzione dello schema all'esterno delle pipeline dichiarative di Lakeflow Spark.
Come si accede allo schema dedotto da from_json?
Visualizzare lo schema della tabella di streaming di destinazione.
È possibile passare from_json uno schema e anche eseguire l'evoluzione?
No, non è possibile passare from_json uno schema e anche eseguire l'evoluzione. Tuttavia, è possibile fornire hint dello schema per eseguire l'override di alcuni o tutti i campi dedotti da from_json.
Cosa accade allo schema se la tabella viene aggiornata completamente?
I riferimenti dello schema associati alla tabella vengono cancellati e lo schema viene reinferito da zero.