Condividi tramite


Configurare l'inferenza e l'evoluzione dello schema nell'Autoloader

È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati caricati, consentendo di inizializzare le tabelle senza dichiarare in modo esplicito lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo.

Il caricatore automatico può anche "salvare" i dati imprevisti (ad esempio, di tipi di dati diversi) in una colonna BLOB JSON, che è possibile scegliere di accedere in un secondo momento usando le API di accesso ai dati semistrutturate.

Per l'inferenza e l'evoluzione dello schema sono supportati i formati seguenti:

File format Versioni supportate
JSON Tutte le versioni
CSV Tutte le versioni
XML Databricks Runtime 14.3 LTS e versioni successive
Avro Databricks Runtime 10.4 LTS e versioni successive
Parquet Databricks Runtime 11.3 LTS e versioni successive
ORC Non supportato
Text Non applicabile (schema fisso)
Binaryfile Non applicabile (schema fisso)

Sintassi per l'inferenza e l'evoluzione dello schema

La specifica di una directory di destinazione per l'opzione cloudFiles.schemaLocation abilita l'inferenza dello schema e l'evoluzione. È possibile scegliere di usare la stessa directory specificata per checkpointLocation. Se si usano tabelle live Delta, Azure Databricks gestisce automaticamente la posizione dello schema e altre informazioni sul checkpoint.

Nota

Se nella tabella di destinazione sono stati caricati più percorsi dati di origine, ogni carico di lavoro di inserimento automatico del caricatore richiede un checkpoint di streaming separato.

Nell'esempio seguente viene parquet usato per .cloudFiles.format Usare csv, avroo json per altre origini file. Tutte le altre impostazioni per la lettura e la scrittura rimangono invariate per i comportamenti predefiniti per ogni formato.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Come funziona l'inferenza dello schema del caricatore automatico?

Per dedurre lo schema durante la prima lettura dei dati, il caricatore automatico campiona i primi 50 GB o 1000 file individuati, a qualunque limite venga superato per primo. Il caricatore automatico archivia le informazioni sullo schema in una directory _schemas configurata cloudFiles.schemaLocation per tenere traccia delle modifiche dello schema ai dati di input nel tempo.

Nota

Per modificare le dimensioni dell'esempio usato, è possibile impostare le configurazioni SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(stringa di byte, ad esempio 10gb)

e

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(intero)

Per impostazione predefinita, l'inferenza dello schema del caricatore automatico cerca di evitare problemi di evoluzione dello schema a causa di mancate corrispondenze del tipo. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), Il caricatore automatico deduce tutte le colonne come stringhe (inclusi i campi annidati nei file JSON). Per i formati con schema tipizzato (Parquet e Avro), Il caricatore automatico esegue l'esempio di un subset di file e unisce gli schemi dei singoli file. Questo comportamento è riepilogato nella tabella seguente:

File format Tipo di dati dedotto predefinito
JSON String
CSV String
XML String
Avro Tipi codificati nello schema Avro
Parquet Tipi codificati nello schema Parquet

Apache Spark DataFrameReader usa un comportamento diverso per l'inferenza dello schema, selezionando i tipi di dati per le colonne in origini JSON, CSV e XML in base ai dati di esempio. Per abilitare questo comportamento con il caricatore automatico, impostare l'opzione cloudFiles.inferColumnTypes su true.

Nota

Quando si deduce lo schema per i dati CSV, il caricatore automatico presuppone che i file contengano intestazioni. Se i file CSV non contengono intestazioni, fornire l'opzione .option("header", "false"). Inoltre, il caricatore automatico unisce gli schemi di tutti i file nell'esempio per ottenere uno schema globale. Il caricatore automatico può quindi leggere ogni file in base all'intestazione e analizzare correttamente il file CSV.

Nota

Quando una colonna ha tipi di dati diversi in due file Parquet, il caricatore automatico sceglie il tipo più ampio. È possibile usare schemaHints per eseguire l'override di questa scelta. Quando si specificano hint per lo schema, il caricatore automatico non esegue il cast della colonna al tipo specificato, ma indica al lettore Parquet di leggere la colonna come tipo specificato. In caso di mancata corrispondenza, la colonna viene salvata nella colonna di dati salvata.

Come funziona l'evoluzione dello schema del caricatore automatico?

Il caricatore automatico rileva l'aggiunta di nuove colonne durante l'elaborazione dei dati. Quando il caricatore automatico rileva una nuova colonna, il flusso si arresta con un oggetto UnknownFieldException. Prima che il flusso generi questo errore, il caricatore automatico esegue l'inferenza dello schema nel micro batch di dati più recente e aggiorna il percorso dello schema con lo schema più recente unendo nuove colonne alla fine dello schema. I tipi di dati delle colonne esistenti rimangono invariati.

Databricks consiglia di configurare flussi del caricatore automatico con flussi di lavoro per il riavvio automatico dopo tali modifiche dello schema.

Il caricatore automatico supporta le modalità seguenti per l'evoluzione dello schema, impostata nell'opzione cloudFiles.schemaEvolutionMode:

Modalità Comportamento durante la lettura di una nuova colonna
addNewColumns (predefinito) Flusso non riesce. Le nuove colonne vengono aggiunte allo schema. Le colonne esistenti non evolvono i tipi di dati.
rescue Lo schema non è mai evoluto e il flusso non ha esito negativo a causa di modifiche dello schema. Tutte le nuove colonne vengono registrate nella colonna di dati salvata.
failOnNewColumns Flusso non riesce. Stream non viene riavviato a meno che lo schema specificato non venga aggiornato o che il file di dati che causa l'offesa venga rimosso.
none Non evolve lo schema, le nuove colonne vengono ignorate e i dati non vengono salvati a meno che non sia impostata l'opzione rescuedDataColumn . Stream non riesce a causa di modifiche dello schema.

Come funzionano le partizioni con l'Autoloader?

Il caricatore automatico tenta di dedurre colonne di partizione dalla struttura di directory sottostante dei dati se i dati sono disposti nel partizionamento in stile Hive. Ad esempio, il percorso base_path/event=click/date=2021-04-01/f0.json del file determina l'inferenza di e event come colonne di date partizione. Se la struttura di directory sottostante contiene partizioni Hive in conflitto o non contiene il partizionamento dello stile Hive, le colonne di partizione vengono ignorate.

I file binari (binaryFile) e text i formati di file hanno schemi di dati fissi, ma supportano l'inferenza delle colonne di partizione. Databricks consiglia di impostare cloudFiles.schemaLocation per questi formati di file. In questo modo si evitano potenziali errori o perdite di informazioni e si impedisce l'inferenza delle colonne delle partizioni ogni volta che inizia un caricatore automatico.

Le colonne di partizione non vengono considerate per l'evoluzione dello schema. Se si dispone di una struttura di directory iniziale come base_path/event=click/date=2021-04-01/f0.jsone quindi iniziare a ricevere nuovi file come base_path/event=click/date=2021-04-01/hour=01/f1.json, Il caricatore automatico ignora la colonna dell'ora. Per acquisire informazioni per le nuove colonne di partizione, impostare su cloudFiles.partitionColumnsevent,date,hour.

Nota

L'opzione cloudFiles.partitionColumns accetta un elenco delimitato da virgole di nomi di colonna. Vengono analizzate solo le colonne esistenti come key=value coppie nella struttura di directory.

Qual è la colonna di dati salvata?

Quando il caricatore automatico deduce lo schema, una colonna di dati salvata viene aggiunta automaticamente allo schema come _rescued_data. È possibile rinominare la colonna o includerla nei casi in cui si specifica uno schema impostando l'opzione rescuedDataColumn.

La colonna di dati salvata garantisce che le colonne che non corrispondono allo schema vengano salvate anziché essere eliminate. La colonna di dati salvata contiene tutti i dati che non vengono analizzati per i motivi seguenti:

  • La colonna non è presente nello schema.
  • Mancata corrispondenza del tipo.
  • Mancata corrispondenza tra maiuscole e minuscole.

La colonna di dati salvata contiene un codice JSON contenente le colonne salvate e il percorso del file di origine del record.

Nota

I parser JSON e CSV supportano tre modalità durante l'analisi dei record: PERMISSIVE, DROPMALFORMEDe FAILFAST. Se usato insieme a rescuedDataColumn, le mancate corrispondenze del tipo di dati non causano l'eliminazione dei record in DROPMALFORMED modalità o generano un errore in FAILFAST modalità . Solo i record danneggiati vengono eliminati o generati errori, ad esempio JSON incompleto o in formato non valido o CSV. Se si usa durante l'analisi badRecordsPath di JSON o CSV, le mancate corrispondenze del tipo di dati non vengono considerate come record non validi quando si usa .rescuedDataColumn Solo i record JSON o CSV incompleti e in formato non valido vengono archiviati in badRecordsPath.

Modificare il comportamento con distinzione tra maiuscole e minuscole

A meno che la distinzione tra maiuscole e minuscole non sia abilitata, le colonne abc, Abce ABC vengono considerate la stessa colonna ai fini dell'inferenza dello schema. Il caso scelto è arbitrario e dipende dai dati campionati. È possibile usare gli hint dello schema per applicare il caso da usare. Dopo aver effettuato una selezione e aver dedotto lo schema, il caricatore automatico non considera le varianti di maiuscole e minuscole non selezionate in modo coerente con lo schema.

Quando la colonna di dati salvata viene abilitata, i campi denominati in un caso diverso da quello dello schema vengono caricati nella _rescued_data colonna. Modificare questo comportamento impostando l'opzione readerCaseSensitive su false, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.

Eseguire l'override dell'inferenza dello schema con hint dello schema

È possibile usare gli hint dello schema per applicare le informazioni sullo schema che si conoscono e si prevedono in uno schema dedotto. Quando si è certi che una colonna è di un tipo di dati specifico o se si vuole scegliere un tipo di dati più generale, ad esempio , doubleintegerè possibile fornire un numero arbitrario di hint per i tipi di dati di colonna come stringa usando la sintassi della specifica dello schema SQL, ad esempio:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Vedere la documentazione sui tipi di dati per l'elenco dei tipi di dati supportati.

Se una colonna non è presente all'inizio del flusso, è anche possibile usare gli hint dello schema per aggiungere tale colonna allo schema dedotto.

Di seguito è riportato un esempio di schema dedotto per visualizzare il comportamento con gli hint dello schema.

Schema dedotto:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Specificando gli hint dello schema seguenti:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

si ottiene:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Nota

Il supporto degli hint per gli schemi di matrice e mapping è disponibile in Databricks Runtime 9.1 LTS e versioni successive.

Di seguito è riportato un esempio di schema dedotto con tipi di dati complessi per visualizzare il comportamento con gli hint dello schema.

Schema dedotto:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Specificando gli hint dello schema seguenti:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

si ottiene:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Nota

Gli hint dello schema vengono usati solo se non si fornisce uno schema al caricatore automatico. È possibile usare hint per lo schema se cloudFiles.inferColumnTypes è abilitato o disabilitato.