Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Delta Lake consente di aggiornare lo schema di una tabella. Sono supportati i tipi seguenti di modifiche:
- Aggiunta di nuove colonne (in posizioni arbitrarie)
- Riordinare le colonne esistenti
- Ridenominazione delle colonne esistenti
È possibile apportare queste modifiche in modo esplicito usando DDL o in modo implicito usando DML.
Importante
Un aggiornamento di uno schema di tabella Delta è un'operazione in conflitto con tutte le operazioni di scrittura Delta simultanee.
Quando si aggiorna uno schema della tabella Delta, i flussi letti da tale tabella terminano. Se si vuole che lo streaming continui, è necessario riavviarlo. Per i metodi consigliati, vedere Considerazioni sulla produzione per Structured Streaming.
Aggiornare in modo esplicito lo schema per aggiungere colonne
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Per impostazione predefinita, il supporto dei valori Null è true.
Per aggiungere una colonna a un campo annidato, usare:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Ad esempio, se lo schema prima dell'esecuzione ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) è:
- root
| - colA
| - colB
| +-field1
| +-field2
lo schema dopo è:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Nota
L'aggiunta di colonne annidate è supportata solo per gli struct. Le matrici e le mappe non sono supportate.
Aggiornare in modo esplicito lo schema per modificare il commento o l'ordinamento delle colonne
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Per modificare una colonna in un campo annidato, usare:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Ad esempio, se lo schema prima dell'esecuzione ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST è:
- root
| - colA
| - colB
| +-field1
| +-field2
lo schema dopo è:
- root
| - colA
| - colB
| +-field2
| +-field1
Aggiornare in modo esplicito lo schema per sostituire le colonne
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Ad esempio, quando si esegue il DDL seguente:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
se lo schema prima è:
- root
| - colA
| - colB
| +-field1
| +-field2
lo schema dopo è:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Aggiornare in modo esplicito lo schema per rinominare le colonne
Nota
Questa funzionalità è disponibile in Databricks Runtime 10.4 LTS e versioni successive.
Per rinominare le colonne senza riscrivere i dati esistenti delle colonne, è necessario abilitare il mapping delle colonne per la tabella. Si veda Rinominare ed eliminare le colonne con mapping di colonne Delta Lake.
Per rinominare una colonna:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Per rinominare un campo annidato:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Ad esempio, quando si esegue il comando seguente:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Se lo schema prima è:
- root
| - colA
| - colB
| +-field1
| +-field2
Lo schema dopo è quindi:
- root
| - colA
| - colB
| +-field001
| +-field2
Si veda Rinominare ed eliminare le colonne con mapping di colonne Delta Lake.
Aggiornare in modo esplicito lo schema per eliminare le colonne
Nota
Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive.
Per eliminare le colonne come operazione di sola metadati senza riscrivere alcun file di dati, è necessario abilitare il mapping delle colonne per la tabella. Si veda Rinominare ed eliminare le colonne con mapping di colonne Delta Lake.
Importante
L'eliminazione di una colonna dai metadati non comporta l'eliminazione dei dati sottostanti per la colonna nei file. Per eliminare i dati delle colonne eliminate, è possibile usare REORG TABLE per riscrivere i file. È quindi possibile usare VACUUM per eliminare fisicamente i file che contengono i dati delle colonne eliminate.
Per eliminare una colonna:
ALTER TABLE table_name DROP COLUMN col_name
Per eliminare più colonne:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Aggiornare in modo esplicito lo schema per modificare il tipo di colonna o il nome
È possibile modificare il tipo o il nome di una colonna o eliminare una colonna riscrivendo la tabella. A tale scopo, usare l'opzione overwriteSchema .
L'esempio seguente illustra la modifica di un tipo di colonna:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
L'esempio seguente mostra la modifica di un nome di colonna:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Abilitare l'evoluzione dello schema
È possibile abilitare l'evoluzione dello schema eseguendo una delle operazioni seguenti:
- Impostare su
.option("mergeSchema", "true")un dataframewriteowriteStreamun'operazione Spark. Vedere Abilitare l'evoluzione dello schema per le scritture per aggiungere nuove colonne. - Usare la
MERGE WITH SCHEMA EVOLUTIONsintassi. Vedere Sintassi dell'evoluzione dello schema per unire. - Impostare spark conf
spark.databricks.delta.schema.autoMerge.enabledsutrueper l'oggetto SparkSession corrente.
Databricks consiglia di abilitare l'evoluzione dello schema per ogni operazione di scrittura anziché impostare una conf Spark.
Quando si usano opzioni o sintassi per abilitare l'evoluzione dello schema in un'operazione di scrittura, questa ha la precedenza sulla conf spark.
Nota
Non esiste alcuna clausola di evoluzione dello schema per INSERT INTO le istruzioni.
Abilitare l'evoluzione dello schema per le scritture per aggiungere nuove colonne
Le colonne presenti nella query di origine ma mancanti nella tabella di destinazione vengono aggiunte automaticamente come parte di una transazione di scrittura quando l'evoluzione dello schema è abilitata. Vedere Abilitare l'evoluzione dello schema.
La distinzione tra maiuscole e minuscole viene mantenuta quando si aggiunge una nuova colonna. Le nuove colonne vengono aggiunte alla fine dello schema della tabella. Se le colonne aggiuntive si trovano in uno struct, vengono aggiunte alla fine dello struct nella tabella di destinazione.
L'esempio seguente illustra l'uso dell'opzione mergeSchema con il caricatore automatico. Vedere Che cos'è l’Autoloader?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
L'esempio seguente illustra l'uso dell'opzione mergeSchema con un'operazione di scrittura batch:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Evoluzione automatica dello schema per l'unione delta Lake
L'evoluzione dello schema consente di risolvere le mancate corrispondenze dello schema tra la tabella di destinazione e quella di origine in merge. Gestisce i due casi seguenti:
Una colonna esiste nella tabella di origine, ma non nella tabella di destinazione e viene specificata in base al nome in un'assegnazione di azioni di inserimento o aggiornamento. In alternativa, è presente un'azione
UPDATE SET *oINSERT *.Tale colonna verrà aggiunta allo schema di destinazione e i relativi valori verranno popolati dalla colonna corrispondente nell'origine.
Questo vale solo quando il nome e la struttura della colonna nell'origine della fusione corrispondono esattamente all'assegnazione target.
La nuova colonna deve essere presente nello schema di origine. L'assegnazione della nuova colonna nella clausola action non definisce tale colonna.
Questi esempi consentono l'evoluzione dello schema:
-- The column newcol is present in the source but not in the target. It will be added to the target. UPDATE SET target.newcol = source.newcol -- The field newfield doesn't exist in struct column somestruct of the target. It will be added to that struct column. UPDATE SET target.somestruct.newfield = source.somestruct.newfield -- The column newcol is present in the source but not in the target. -- It will be added to the target. UPDATE SET target.newcol = source.newcol + 1 -- Any columns and nested fields in the source that don't exist in target will be added to the target. UPDATE SET * INSERT *Questi esempi non attivano l'evoluzione dello schema se la colonna
newcolnon è presente nellosourceschema:UPDATE SET target.newcol = source.someothercol UPDATE SET target.newcol = source.x + source.y UPDATE SET target.newcol = source.output.newcolEsiste una colonna nella tabella di destinazione, ma non nella tabella di origine.
Lo schema di destinazione non viene modificato. Queste colonne:
Rimane invariato per
UPDATE SET *.Sono impostate
NULLsu perINSERT *.Può ancora essere modificato esplicitamente se assegnato nella clausola di azione.
Per esempio:
UPDATE SET * -- The target columns that are not in the source are left unchanged. INSERT * -- The target columns that are not in the source are set to NULL. UPDATE SET target.onlyintarget = 5 -- The target column is explicitly updated. UPDATE SET target.onlyintarget = source.someothercol -- The target column is explicitly updated from some other source column.
È necessario abilitare manualmente l'evoluzione automatica dello schema. Vedere Abilitare l'evoluzione dello schema.
Nota
In Databricks Runtime 12.2 LTS e versioni successive i campi colonne e struct presenti nella tabella di origine possono essere specificati in base al nome nelle azioni di inserimento o aggiornamento. In Databricks Runtime 11.3 LTS e versioni successive è possibile usare solo INSERT * azioni o UPDATE SET * per l'evoluzione dello schema con merge.
In Databricks Runtime 13.3 LTS e versioni successive è possibile usare l'evoluzione dello schema con struct annidati all'interno delle mappe, ad esempio map<int, struct<a: int, b: int>>.
Sintassi dell'evoluzione dello schema per l'unione
In Databricks Runtime 15.4 LTS e versioni successive è possibile specificare l'evoluzione dello schema in un'istruzione merge usando le API di tabella SQL o Delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Pitone
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Linguaggio di programmazione Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Operazioni di esempio di unione con evoluzione dello schema
Ecco alcuni esempi degli effetti dell'operazione merge con e senza evoluzione dello schema.
| Colonne | Query (interrogazione in SQL) | Comportamento senza evoluzione dello schema (impostazione predefinita) | Comportamento con l'evoluzione dello schema |
|---|---|---|---|
Colonne di destinazione: key, valueColonne di origine: key, value, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT * |
Lo schema della tabella rimane invariato; solo le colonne key, value vengono aggiornate/inserite. |
Lo schema della tabella viene modificato in (key, value, new_value). I record esistenti con corrispondenze vengono aggiornati con value e new_value nell'origine. Le nuove righe vengono inserite con lo schema (key, value, new_value). |
Colonne di destinazione: key, old_valueColonne di origine: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT * |
UPDATE le azioni e INSERT generano un errore perché la colonna old_value di destinazione non si trova nell'origine. |
Lo schema della tabella viene modificato in (key, old_value, new_value). I record esistenti con corrispondenze vengono aggiornati con nell'origine new_value lasciando old_value invariati. I nuovi record vengono inseriti con l'oggetto , keye new_value specificato NULLper .old_value |
Colonne di destinazione: key, old_valueColonne di origine: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE genera un errore perché la colonna new_value non esiste nella tabella di destinazione. |
Lo schema della tabella viene modificato in (key, old_value, new_value). I record esistenti con corrispondenze vengono aggiornati con nell'oggetto new_value nell'origine lasciando old_value invariati e i record non corrispondenti sono NULL stati immessi per new_value. Vedere la nota (1). |
Colonne di destinazione: key, old_valueColonne di origine: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT genera un errore perché la colonna new_value non esiste nella tabella di destinazione. |
Lo schema della tabella viene modificato in (key, old_value, new_value). I nuovi record vengono inseriti con l'oggetto , keye new_value specificato NULLper .old_value I record esistenti sono NULL stati immessi per new_value lasciare old_value invariati. Vedere la nota (1). |
(1) Questo comportamento è disponibile in Databricks Runtime 12.2 LTS e versioni successive; Databricks Runtime 11.3 LTS e sotto l'errore in questa condizione.
Escludere colonne con merge Delta Lake
In Databricks Runtime 12.2 LTS e versioni successive è possibile usare EXCEPT le clausole nelle condizioni di merge per escludere in modo esplicito le colonne. Il comportamento della EXCEPT parola chiave varia a seconda che l'evoluzione dello schema sia abilitata o meno.
Con l'evoluzione dello schema disabilitata, la EXCEPT parola chiave si applica all'elenco di colonne nella tabella di destinazione e consente di escludere colonne da UPDATE o INSERT azioni. Le colonne escluse sono impostate su null.
Con l'evoluzione dello schema abilitata, la EXCEPT parola chiave si applica all'elenco di colonne nella tabella di origine e consente di escludere colonne dall'evoluzione dello schema. Una nuova colonna nell'origine non presente nella destinazione non viene aggiunta allo schema di destinazione se è elencata nella EXCEPT clausola . Le colonne escluse già presenti nella destinazione sono impostate su null.
Gli esempi seguenti illustrano questa sintassi:
| Colonne | Query (interrogazione in SQL) | Comportamento senza evoluzione dello schema (impostazione predefinita) | Comportamento con l'evoluzione dello schema |
|---|---|---|---|
Colonne di destinazione: id, title, last_updatedColonne di origine: id, title, review, last_updated |
MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Le nuove righe vengono inserite usando i valori per id e title. Il campo last_updated escluso è impostato su null. Il campo review viene ignorato perché non si trova nella destinazione. |
Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Lo schema si è evoluto per aggiungere il campo review. Le nuove righe vengono inserite usando tutti i campi di origine, ad eccezione last_updated del quale è impostato su null. |
Colonne di destinazione: id, title, last_updatedColonne di origine: id, title, review, internal_count |
MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT genera un errore perché la colonna internal_count non esiste nella tabella di destinazione. |
Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Il review campo viene aggiunto alla tabella di destinazione, ma il internal_count campo viene ignorato. Le nuove righe inserite sono last_updated impostate su null. |
NullType Gestione delle colonne negli aggiornamenti dello schema
Poiché Parquet non supporta NullType, NullType le colonne vengono eliminate dal dataframe durante la scrittura in tabelle Delta, ma vengono comunque archiviate nello schema. Quando viene ricevuto un tipo di dati diverso per tale colonna, Delta Lake unisce lo schema al nuovo tipo di dati. Se Delta Lake riceve un oggetto NullType per una colonna esistente, lo schema precedente viene mantenuto e la nuova colonna viene eliminata durante la scrittura.
NullType in streaming non è supportato. Poiché è necessario impostare gli schemi quando si usa lo streaming, questo dovrebbe essere molto raro.
NullType non è inoltre accettato per tipi complessi, ad ArrayType esempio e MapType.
Sostituire lo schema delle tabelle
Per impostazione predefinita, la sovrascrittura dei dati in una tabella non sovrascrive lo schema. Quando si sovrascrive una tabella usando mode("overwrite") senza replaceWhere, è comunque possibile sovrascrivere lo schema dei dati scritti. Sostituire lo schema e il partizionamento della tabella impostando l'opzione overwriteSchema su true:
df.write.option("overwriteSchema", "true")
Importante
Non è possibile specificare overwriteSchema come true quando si usa la sovrascrittura della partizione dinamica.