Aktualisieren eines Delta Lake-Tabellenschemas
Mit Delta Lake können Sie das Schema einer Tabelle aktualisieren. Die folgenden Typen von Änderungen werden unterstützt:
- Hinzufügen neuer Spalten (an beliebigen Positionen)
- Neuanordnen vorhandener Spalten
- Umbenennen vorhandener Spalten
Sie können diese Änderungen explizit über die DDL oder implizit über die DML vornehmen.
Wichtig
Ein Update auf ein Delta-Tabellenschema ist ein Vorgang, der mit allen gleichzeitigen Delta-Schreibvorgängen in Konflikt steht.
Wenn Sie ein Delta-Tabellenschema aktualisieren, werden Streams, die aus dieser Tabelle lesen, beendet. Wenn der Stream fortgesetzt werden soll, müssen Sie ihn neu starten. Empfohlene Methoden finden Sie unter Produktionsüberlegungen für strukturiertes Streaming.
Explizites Aktualisieren des Schemas zum Hinzufügen von Spalten
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Die NULL-Zulässigkeit ist standardmäßig auf true
festgelegt.
Um einem geschachtelten Feld eine Spalte hinzuzufügen, verwenden Sie folgenden Befehl:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Wenn das Schema vor der Ausführung von ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
z. B. folgendermaßen lautet:
- root
| - colA
| - colB
| +-field1
| +-field2
lautet das Schema nach der Ausführung folgendermaßen:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Hinweis
Das Hinzufügen geschachtelter Spalten wird nur für Strukturen unterstützt. Arrays und Zuordnungen werden nicht unterstützt.
Explizites Aktualisieren des Schemas zum Ändern des Kommentars oder der Reihenfolge von Spalten
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Um eine Spalte in einem geschachtelten Feld zu ändern, verwenden Sie folgenden Befehl:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Wenn das Schema vor der Ausführung von ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
z. B. folgendermaßen lautet:
- root
| - colA
| - colB
| +-field1
| +-field2
lautet das Schema nach der Ausführung folgendermaßen:
- root
| - colA
| - colB
| +-field2
| +-field1
Explizites Aktualisieren des Schemas zum Ersetzen von Spalten
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Bei der Ausführung der folgenden DDL-Anweisung:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
wenn das Schema vorher folgendermaßen lautet:
- root
| - colA
| - colB
| +-field1
| +-field2
lautet das Schema nach der Ausführung folgendermaßen:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Explizites Aktualisieren des Schemas zum Umbenennen von Spalten
Hinweis
Dieses Feature ist in Databricks Runtime 10.4 LTS und höher verfügbar.
Um Spalten umzubenennen, ohne vorhandene Daten der Spalten neu zu schreiben, müssen Sie die Spaltenzuordnung für die Tabelle aktivieren. Weitere Informationen finden Sie unter Rename and drop columns with Delta Lake column mapping (Umbenennen und Löschen von Spalten mit Delta Lake-Spaltenzuordnung).
So benennen Sie eine Spalte um
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
So benennen Sie ein geschachteltes Feld um
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Bei der Ausführung des folgenden Befehls:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
wenn das Schema vorher folgendermaßen lautet:
- root
| - colA
| - colB
| +-field1
| +-field2
lautet das Schema nach der Ausführung folgendermaßen:
- root
| - colA
| - colB
| +-field001
| +-field2
Weitere Informationen finden Sie unter Rename and drop columns with Delta Lake column mapping (Umbenennen und Löschen von Spalten mit Delta Lake-Spaltenzuordnung).
Explizites Aktualisieren des Schemas zum Löschen von Spalten
Hinweis
Dieses Feature ist in Databricks Runtime 11.3 LTS und höher verfügbar.
Wenn Sie Spalten im Rahmen eines reinen Metadatenvorgangs löschen möchten, ohne Datendateien neu zu schreiben, müssen Sie die Spaltenzuordnung für die Tabelle aktivieren. Weitere Informationen finden Sie unter Rename and drop columns with Delta Lake column mapping (Umbenennen und Löschen von Spalten mit Delta Lake-Spaltenzuordnung).
Wichtig
Wenn Sie eine Spalte über Metadaten löschen, werden die zugrunde liegenden Daten für die Spalte in Dateien nicht gelöscht. Zum Löschen der Daten der gelöschten Spalte können Sie REORG TABLE verwenden, um Dateien neu zu schreiben. Anschließend können Sie VACUUM verwenden, um die Dateien, die die Daten der gelöschten Spalte enthalten, physisch zu löschen.
So löschen Sie eine Spalte
ALTER TABLE table_name DROP COLUMN col_name
So löschen Sie mehrere Spalten
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Explizites Aktualisieren des Schemas zum Ändern des Spaltentyps oder -namens
Sie können den Typ oder Namen einer Spalte ändern oder eine Spalte löschen, indem Sie die Tabelle neu schreiben. Verwenden Sie hierzu die Option overwriteSchema
.
Das folgende Beispiel zeigt das Ändern eines Spaltentyps:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Das folgende Beispiel zeigt das Ändern eines Spaltennamens:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Aktivieren der Schemaentwicklung
Sie können die Schemaentwicklung aktivieren, indem Sie eine der folgenden Aktionen ausführen:
- Legen Sie
.option("mergeSchema", "true")
auf einen Spark DataFramewrite
- oderwriteStream
-Vorgang fest. Siehe Aktivieren der Schemaentwicklung für Schreibvorgänge, um neue Spalten hinzuzufügen. - Verwenden Sie die Syntax
MERGE WITH SCHEMA EVOLUTION
. Weitere Informationen finden Sie unter Schemaentwicklungssyntax für Merge. - Setzen Sie die Spark conf
spark.databricks.delta.schema.autoMerge.enabled
für die aktuelle SparkSession auftrue
.
Databricks empfiehlt, die Schemaentwicklung für jeden Schreibvorgang zu aktivieren, anstatt eine Spark conf festzulegen.
Wenn Sie Optionen oder Syntax verwenden, um die Schemaentwicklung in einem Schreibvorgang zu aktivieren, hat dies Vorrang vor der Spark conf.
Hinweis
Es gibt keine Schemaentwicklungsklausel für INSERT INTO
-Anweisungen.
Aktivieren der Schemaentwicklung für Schreibvorgänge, um neue Spalten hinzuzufügen
Spalten, die in der Quellabfrage vorhanden sind, aber in der Zieltabelle fehlen, werden automatisch als Teil einer Schreibtransaktion hinzugefügt, wenn die Schemaentwicklung aktiviert ist. Siehe Aktivieren der Schemaentwicklung.
Die Groß-/Kleinschreibung wird beim Anfügen einer neuen Spalte beibehalten. Neue Spalten werden am Ende des Tabellenschemas hinzugefügt. Wenn sich die zusätzlichen Spalten in einer Struktur befinden, werden sie am Ende der Struktur in der Zieltabelle angefügt.
Das folgende Beispiel veranschaulicht die Verwendung der Option mergeSchema
mit dem Autoloader. Weitere Informationen finden Sie unter Automatisches Laden.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Im folgenden Beispiel wird die Verwendung der Option mergeSchema
mit einem Batchschreibvorgang veranschaulicht:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Automatische Schemaentwicklung für Delta Lake-Merge
Die Schemaentwicklung ermöglicht es Benutzern, Schemakonflikte zwischen der Ziel- und der Quelltabelle beim Zusammenführen zu beheben. Es werden die folgenden beiden Fälle behandelt:
- Eine Spalte in der Quelltabelle ist in der Zieltabelle nicht vorhanden. Die neue Spalte wird dem Zielschema hinzugefügt, und ihre Werte werden mithilfe der Quellwerte eingefügt oder aktualisiert.
- Eine Spalte in der Zieltabelle ist in der Quelltabelle nicht vorhanden. Das Zielschema bleibt unverändert. Die Werte in der zusätzlichen Zielspalte bleiben entweder unverändert (für
UPDATE
) oder werden aufNULL
(fürINSERT
) festgelegt.
Sie müssen die automatische Schemaentwicklung manuell aktivieren. Siehe Aktivieren der Schemaentwicklung.
Hinweis
In Databricks Runtime 12.2 LTS und höher können Spalten und Strukturfelder, die in der Quelltabelle vorhanden sind, in Einfüge- oder Updateaktionen durch den Namen angegeben werden. In Databricks Runtime 11.3 LTS und niedriger können nur INSERT *
- oder UPDATE SET *
-Aktionen für die Schemaentwicklung mit Merge verwendet werden.
In Databricks Runtime 13.3 LTS und höher können Sie die Schemaentwicklung mit Strukturen verwenden, die in Zuordnungen geschachtelt sind, z. B. map<int, struct<a: int, b: int>>
.
Schemaentwicklungssyntax für Merge
In Databricks Runtime 15.2 und höher können Sie die Schemaentwicklung in einer Zusammenführungsanweisung mithilfe von SQL oder Delta-Tabellen-APIs angeben:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Beispielvorgänge für Merge mit Schemaentwicklung
Hier sind einige Beispiele für die Auswirkungen eines merge
-Vorgangs mit und ohne Schemaentwicklung.
Spalten | Abfrage (in SQL) | Verhalten ohne Schemaentwicklung (Standard) | Verhalten mit Schemaentwicklung |
---|---|---|---|
Zielspalten: key, value Quellspalten: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Das Tabellenschema bleibt unverändert. Nur die Spalten key und value werden aktualisiert/eingefügt. |
Das Tabellenschema wird in (key, value, new_value) geändert. Vorhandene Datensätze mit Übereinstimmungen werden mit value und new_value in der Quelle aktualisiert. Neue Zeilen werden mit dem Schema (key, value, new_value) eingefügt. |
Zielspalten: key, old_value Quellspalten: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
Die Aktionen UPDATE und INSERT lösen einen Fehler aus, weil die Zielspalte old_value in der Quelle nicht enthalten ist. |
Das Tabellenschema wird in (key, old_value, new_value) geändert. Vorhandene Datensätze mit Übereinstimmungen werden mit new_value in der Quelle aktualisiert, während old_value unverändert bleibt. Neue Datensätze werden mit dem angegebenen key , new_value und NULL für old_value eingefügt. |
Zielspalten: key, old_value Quellspalten: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE löst einen Fehler aus, weil die Spalte new_value in der Zieltabelle nicht enthalten ist. |
Das Tabellenschema wird in (key, old_value, new_value) geändert. Vorhandene Datensätze mit Übereinstimmungen werden mit dem new_value in der Quelle aktualisiert, wobei old_value unverändert bleibt, und bei nicht übereinstimmenden Datensätze wurde NULL als new_value eingegeben. Siehe den Hinweis (1). |
Zielspalten: key, old_value Quellspalten: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT löst einen Fehler aus, weil die Spalte new_value in der Zieltabelle nicht enthalten ist. |
Das Tabellenschema wird in (key, old_value, new_value) geändert. Neue Datensätze werden mit dem angegebenen key , new_value und NULL für old_value eingefügt. Bei vorhandenen Datensätzen wurde NULL als new_value eingegeben, wobei old_value unverändert gelassen wurde. Siehe den Hinweis (1). |
(1) Dieses Verhalten ist in Databricks Runtime 12.2 LTS und höher verfügbar; in Databricks Runtime 11.3 LTS und niedriger wird in diesem Fall ein Fehler angezeigt.
Ausschließen von Spalten mit Delta Lake-Merges
In Databricks Runtime 12.2 LTS und höher können Sie EXCEPT
-Klauseln in Mergebedingungen verwenden, um Spalten explizit auszuschließen. Das Verhalten des Schlüsselworts EXCEPT
hängt davon ab, ob die Schemaentwicklung aktiviert ist oder nicht.
Wenn die Schemaentwicklung deaktiviert ist, gilt das EXCEPT
-Schlüsselwort für die Liste der Spalten in der Zieltabelle und ermöglicht den Ausschluss von Spalten von UPDATE
- oder INSERT
-Aktionen. Ausgeschlossene Spalten werden auf null
festgelegt.
Wenn die Schemaentwicklung aktiviert ist, gilt das EXCEPT
-Schlüsselwort für die Liste der Spalten in der Quelltabelle und ermöglicht es, Spalten von der Schemaentwicklung auszuschließen. Eine neue Spalte in der Quelle, die im Ziel nicht vorhanden ist, wird dem Zielschema nicht hinzugefügt, wenn sie in der EXCEPT
-Klausel aufgeführt ist. Ausgeschlossene Spalten, die bereits im Ziel vorhanden sind, werden auf null
festgelegt.
Die folgenden Beispiele veranschaulichen diese Syntax:
Spalten | Abfrage (in SQL) | Verhalten ohne Schemaentwicklung (Standard) | Verhalten mit Schemaentwicklung |
---|---|---|---|
Zielspalten: id, title, last_updated Quellspalten: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Übereinstimmende Zeilen werden aktualisiert, indem das Feld last_updated auf das aktuelle Datum festgelegt wird. Neue Zeilen werden mithilfe von Werten für id und title eingefügt. Das ausgeschlossene Feld last_updated ist auf null festgelegt. Das Feld review wird ignoriert, da es nicht im Ziel enthalten ist. |
Übereinstimmende Zeilen werden aktualisiert, indem das Feld last_updated auf das aktuelle Datum festgelegt wird. Das Schema wurde weiterentwickelt, um das Feld review hinzuzufügen. Neue Zeilen werden unter Verwendung aller Quellfelder mit Ausnahme von last_updated eingefügt, das auf null festgelegt ist. |
Zielspalten: id, title, last_updated Quellspalten: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT löst einen Fehler aus, weil die Spalte internal_count in der Zieltabelle nicht enthalten ist. |
Übereinstimmende Zeilen werden aktualisiert, indem das Feld last_updated auf das aktuelle Datum festgelegt wird. Das Feld review wird der Zieltabelle hinzugefügt, aber das Feld internal_count wird ignoriert. Bei neu eingefügten Zeilen wird last_updated auf null festgelegt. |
Handhabung von NullType
-Spalten bei Schemaaktualisierungen
Da Parquet NullType
nicht unterstützt, werden NullType
-Spalten beim Schreiben in Delta-Tabellen aus dem DataFrame gelöscht, aber weiterhin im Schema gespeichert. Wenn ein anderer Datentyp für diese Spalte empfangen wird, führt Delta Lake das Schema mit dem neuen Datentyp zusammen. Wenn Delta Lake einen NullType
für eine vorhandene Spalte empfängt, wird das alte Schema beibehalten, und die neue Spalte wird während des Schreibvorgangs gelöscht.
NullType
wird im Streaming nicht unterstützt. Da Sie bei Verwendung von Streaming Schemas festlegen müssen, sollte dieser Fall sehr selten vorkommen. NullType
wird außerdem für komplexe Typen wie ArrayType
und MapType
nicht akzeptiert.
Ersetzen von Tabellenschemas
Standardmäßig wird das Schema durch das Überschreiben der Daten in einer Tabelle nicht überschrieben. Wenn Sie eine Tabelle mit mode("overwrite")
ohne replaceWhere
überschreiben, möchten Sie vielleicht dennoch das Schema der geschriebenen Daten überschreiben. Sie ersetzen das Schema und die Partitionierung der Tabelle, indem Sie die Option overwriteSchema
auf true
festlegen:
df.write.option("overwriteSchema", "true")
Wichtig
Sie können overwriteSchema
nicht als true
angeben, wenn Sie die dynamische Partitionsüberschreibung verwenden.