Freigeben über


Aktualisieren eines Delta Lake-Tabellenschemas

Mit Delta Lake können Sie das Schema einer Tabelle aktualisieren. Die folgenden Typen von Änderungen werden unterstützt:

  • Hinzufügen neuer Spalten (an beliebigen Positionen)
  • Neuanordnen vorhandener Spalten
  • Umbenennen vorhandener Spalten

Sie können diese Änderungen explizit über die DDL oder implizit über die DML vornehmen.

Wichtig

Ein Update auf ein Delta-Tabellenschema ist ein Vorgang, der mit allen gleichzeitigen Delta-Schreibvorgängen in Konflikt steht.

Wenn Sie ein Delta-Tabellenschema aktualisieren, werden Streams, die aus dieser Tabelle lesen, beendet. Wenn der Stream fortgesetzt werden soll, müssen Sie ihn neu starten. Empfohlene Methoden finden Sie unter Produktionsüberlegungen für strukturiertes Streaming.

Explizites Aktualisieren des Schemas zum Hinzufügen von Spalten

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Die NULL-Zulässigkeit ist standardmäßig auf true festgelegt.

Um einem geschachtelten Feld eine Spalte hinzuzufügen, verwenden Sie folgenden Befehl:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Wenn das Schema vor der Ausführung von ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) z. B. folgendermaßen lautet:

- root
| - colA
| - colB
| +-field1
| +-field2

lautet das Schema nach der Ausführung folgendermaßen:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Hinweis

Das Hinzufügen geschachtelter Spalten wird nur für Strukturen unterstützt. Arrays und Zuordnungen werden nicht unterstützt.

Explizites Aktualisieren des Schemas zum Ändern des Kommentars oder der Reihenfolge von Spalten

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Um eine Spalte in einem geschachtelten Feld zu ändern, verwenden Sie folgenden Befehl:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Wenn das Schema vor der Ausführung von ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST z. B. folgendermaßen lautet:

- root
| - colA
| - colB
| +-field1
| +-field2

lautet das Schema nach der Ausführung folgendermaßen:

- root
| - colA
| - colB
| +-field2
| +-field1

Explizites Aktualisieren des Schemas zum Ersetzen von Spalten

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Bei der Ausführung der folgenden DDL-Anweisung:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

wenn das Schema vorher folgendermaßen lautet:

- root
| - colA
| - colB
| +-field1
| +-field2

lautet das Schema nach der Ausführung folgendermaßen:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Explizites Aktualisieren des Schemas zum Umbenennen von Spalten

Hinweis

Dieses Feature ist in Databricks Runtime 10.4 LTS und höher verfügbar.

Um Spalten umzubenennen, ohne vorhandene Daten der Spalten neu zu schreiben, müssen Sie die Spaltenzuordnung für die Tabelle aktivieren. Weitere Informationen finden Sie unter Rename and drop columns with Delta Lake column mapping (Umbenennen und Löschen von Spalten mit Delta Lake-Spaltenzuordnung).

So benennen Sie eine Spalte um

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

So benennen Sie ein geschachteltes Feld um

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Bei der Ausführung des folgenden Befehls:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

wenn das Schema vorher folgendermaßen lautet:

- root
| - colA
| - colB
| +-field1
| +-field2

lautet das Schema nach der Ausführung folgendermaßen:

- root
| - colA
| - colB
| +-field001
| +-field2

Weitere Informationen finden Sie unter Rename and drop columns with Delta Lake column mapping (Umbenennen und Löschen von Spalten mit Delta Lake-Spaltenzuordnung).

Explizites Aktualisieren des Schemas zum Löschen von Spalten

Hinweis

Dieses Feature ist in Databricks Runtime 11.3 LTS und höher verfügbar.

Wenn Sie Spalten im Rahmen eines reinen Metadatenvorgangs löschen möchten, ohne Datendateien neu zu schreiben, müssen Sie die Spaltenzuordnung für die Tabelle aktivieren. Weitere Informationen finden Sie unter Rename and drop columns with Delta Lake column mapping (Umbenennen und Löschen von Spalten mit Delta Lake-Spaltenzuordnung).

Wichtig

Wenn Sie eine Spalte über Metadaten löschen, werden die zugrunde liegenden Daten für die Spalte in Dateien nicht gelöscht. Zum Löschen der Daten der gelöschten Spalte können Sie REORG TABLE verwenden, um Dateien neu zu schreiben. Anschließend können Sie VACUUM verwenden, um die Dateien, die die Daten der gelöschten Spalte enthalten, physisch zu löschen.

So löschen Sie eine Spalte

ALTER TABLE table_name DROP COLUMN col_name

So löschen Sie mehrere Spalten

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Explizites Aktualisieren des Schemas zum Ändern des Spaltentyps oder -namens

Sie können den Typ oder Namen einer Spalte ändern oder eine Spalte löschen, indem Sie die Tabelle neu schreiben. Verwenden Sie hierzu die Option overwriteSchema.

Das folgende Beispiel zeigt das Ändern eines Spaltentyps:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Das folgende Beispiel zeigt das Ändern eines Spaltennamens:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Aktivieren der Schemaentwicklung

Sie können die Schemaentwicklung aktivieren, indem Sie eine der folgenden Aktionen ausführen:

Databricks empfiehlt, die Schemaentwicklung für jeden Schreibvorgang zu aktivieren, anstatt eine Spark conf festzulegen.

Wenn Sie Optionen oder Syntax verwenden, um die Schemaentwicklung in einem Schreibvorgang zu aktivieren, hat dies Vorrang vor der Spark conf.

Hinweis

Es gibt keine Schemaentwicklungsklausel für INSERT INTO-Anweisungen.

Aktivieren der Schemaentwicklung für Schreibvorgänge, um neue Spalten hinzuzufügen

Spalten, die in der Quellabfrage vorhanden sind, aber in der Zieltabelle fehlen, werden automatisch als Teil einer Schreibtransaktion hinzugefügt, wenn die Schemaentwicklung aktiviert ist. Siehe Aktivieren der Schemaentwicklung.

Die Groß-/Kleinschreibung wird beim Anfügen einer neuen Spalte beibehalten. Neue Spalten werden am Ende des Tabellenschemas hinzugefügt. Wenn sich die zusätzlichen Spalten in einer Struktur befinden, werden sie am Ende der Struktur in der Zieltabelle angefügt.

Das folgende Beispiel veranschaulicht die Verwendung der Option mergeSchema mit dem Autoloader. Weitere Informationen finden Sie unter Automatisches Laden.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

Im folgenden Beispiel wird die Verwendung der Option mergeSchema mit einem Batchschreibvorgang veranschaulicht:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Automatische Schemaentwicklung für Delta Lake-Merge

Die Schemaentwicklung ermöglicht es Benutzern, Schemakonflikte zwischen der Ziel- und der Quelltabelle beim Zusammenführen zu beheben. Es werden die folgenden beiden Fälle behandelt:

  1. Eine Spalte in der Quelltabelle ist in der Zieltabelle nicht vorhanden. Die neue Spalte wird dem Zielschema hinzugefügt, und ihre Werte werden mithilfe der Quellwerte eingefügt oder aktualisiert.
  2. Eine Spalte in der Zieltabelle ist in der Quelltabelle nicht vorhanden. Das Zielschema bleibt unverändert. Die Werte in der zusätzlichen Zielspalte bleiben entweder unverändert (für UPDATE) oder werden auf NULL (für INSERT) festgelegt.

Sie müssen die automatische Schemaentwicklung manuell aktivieren. Siehe Aktivieren der Schemaentwicklung.

Hinweis

In Databricks Runtime 12.2 LTS und höher können Spalten und Strukturfelder, die in der Quelltabelle vorhanden sind, in Einfüge- oder Updateaktionen durch den Namen angegeben werden. In Databricks Runtime 11.3 LTS und niedriger können nur INSERT *- oder UPDATE SET *-Aktionen für die Schemaentwicklung mit Merge verwendet werden.

In Databricks Runtime 13.3 LTS und höher können Sie die Schemaentwicklung mit Strukturen verwenden, die in Zuordnungen geschachtelt sind, z. B. map<int, struct<a: int, b: int>>.

Schemaentwicklungssyntax für Merge

In Databricks Runtime 15.2 und höher können Sie die Schemaentwicklung in einer Zusammenführungsanweisung mithilfe von SQL oder Delta-Tabellen-APIs angeben:

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Beispielvorgänge für Merge mit Schemaentwicklung

Hier sind einige Beispiele für die Auswirkungen eines merge-Vorgangs mit und ohne Schemaentwicklung.

Spalten Abfrage (in SQL) Verhalten ohne Schemaentwicklung (Standard) Verhalten mit Schemaentwicklung
Zielspalten: key, value

Quellspalten: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Das Tabellenschema bleibt unverändert. Nur die Spalten key und value werden aktualisiert/eingefügt. Das Tabellenschema wird in (key, value, new_value) geändert. Vorhandene Datensätze mit Übereinstimmungen werden mit value und new_value in der Quelle aktualisiert. Neue Zeilen werden mit dem Schema (key, value, new_value) eingefügt.
Zielspalten: key, old_value

Quellspalten: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Die Aktionen UPDATE und INSERT lösen einen Fehler aus, weil die Zielspalte old_value in der Quelle nicht enthalten ist. Das Tabellenschema wird in (key, old_value, new_value) geändert. Vorhandene Datensätze mit Übereinstimmungen werden mit new_value in der Quelle aktualisiert, während old_value unverändert bleibt. Neue Datensätze werden mit dem angegebenen key, new_valueund NULL für old_valueeingefügt.
Zielspalten: key, old_value

Quellspalten: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE löst einen Fehler aus, weil die Spalte new_value in der Zieltabelle nicht enthalten ist. Das Tabellenschema wird in (key, old_value, new_value) geändert. Vorhandene Datensätze mit Übereinstimmungen werden mit dem new_value in der Quelle aktualisiert, wobei old_value unverändert bleibt, und bei nicht übereinstimmenden Datensätze wurde NULL als new_value eingegeben. Siehe den Hinweis (1).
Zielspalten: key, old_value

Quellspalten: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT löst einen Fehler aus, weil die Spalte new_value in der Zieltabelle nicht enthalten ist. Das Tabellenschema wird in (key, old_value, new_value) geändert. Neue Datensätze werden mit dem angegebenen key, new_valueund NULL für old_valueeingefügt. Bei vorhandenen Datensätzen wurde NULL als new_value eingegeben, wobei old_value unverändert gelassen wurde. Siehe den Hinweis (1).

(1) Dieses Verhalten ist in Databricks Runtime 12.2 LTS und höher verfügbar; in Databricks Runtime 11.3 LTS und niedriger wird in diesem Fall ein Fehler angezeigt.

Ausschließen von Spalten mit Delta Lake-Merges

In Databricks Runtime 12.2 LTS und höher können Sie EXCEPT-Klauseln in Mergebedingungen verwenden, um Spalten explizit auszuschließen. Das Verhalten des Schlüsselworts EXCEPT hängt davon ab, ob die Schemaentwicklung aktiviert ist oder nicht.

Wenn die Schemaentwicklung deaktiviert ist, gilt das EXCEPT-Schlüsselwort für die Liste der Spalten in der Zieltabelle und ermöglicht den Ausschluss von Spalten von UPDATE- oder INSERT-Aktionen. Ausgeschlossene Spalten werden auf null festgelegt.

Wenn die Schemaentwicklung aktiviert ist, gilt das EXCEPT-Schlüsselwort für die Liste der Spalten in der Quelltabelle und ermöglicht es, Spalten von der Schemaentwicklung auszuschließen. Eine neue Spalte in der Quelle, die im Ziel nicht vorhanden ist, wird dem Zielschema nicht hinzugefügt, wenn sie in der EXCEPT-Klausel aufgeführt ist. Ausgeschlossene Spalten, die bereits im Ziel vorhanden sind, werden auf null festgelegt.

Die folgenden Beispiele veranschaulichen diese Syntax:

Spalten Abfrage (in SQL) Verhalten ohne Schemaentwicklung (Standard) Verhalten mit Schemaentwicklung
Zielspalten: id, title, last_updated

Quellspalten: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Übereinstimmende Zeilen werden aktualisiert, indem das Feld last_updated auf das aktuelle Datum festgelegt wird. Neue Zeilen werden mithilfe von Werten für id und titleeingefügt. Das ausgeschlossene Feld last_updated ist auf null festgelegt. Das Feld review wird ignoriert, da es nicht im Ziel enthalten ist. Übereinstimmende Zeilen werden aktualisiert, indem das Feld last_updated auf das aktuelle Datum festgelegt wird. Das Schema wurde weiterentwickelt, um das Feld reviewhinzuzufügen. Neue Zeilen werden unter Verwendung aller Quellfelder mit Ausnahme von last_updated eingefügt, das auf null festgelegt ist.
Zielspalten: id, title, last_updated

Quellspalten: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT löst einen Fehler aus, weil die Spalte internal_count in der Zieltabelle nicht enthalten ist. Übereinstimmende Zeilen werden aktualisiert, indem das Feld last_updated auf das aktuelle Datum festgelegt wird. Das Feld review wird der Zieltabelle hinzugefügt, aber das Feld internal_count wird ignoriert. Bei neu eingefügten Zeilen wird last_updated auf null festgelegt.

Handhabung von NullType-Spalten bei Schemaaktualisierungen

Da Parquet NullType nicht unterstützt, werden NullType-Spalten beim Schreiben in Delta-Tabellen aus dem DataFrame gelöscht, aber weiterhin im Schema gespeichert. Wenn ein anderer Datentyp für diese Spalte empfangen wird, führt Delta Lake das Schema mit dem neuen Datentyp zusammen. Wenn Delta Lake einen NullType für eine vorhandene Spalte empfängt, wird das alte Schema beibehalten, und die neue Spalte wird während des Schreibvorgangs gelöscht.

NullType wird im Streaming nicht unterstützt. Da Sie bei Verwendung von Streaming Schemas festlegen müssen, sollte dieser Fall sehr selten vorkommen. NullType wird außerdem für komplexe Typen wie ArrayType und MapType nicht akzeptiert.

Ersetzen von Tabellenschemas

Standardmäßig wird das Schema durch das Überschreiben der Daten in einer Tabelle nicht überschrieben. Wenn Sie eine Tabelle mit mode("overwrite") ohne replaceWhere überschreiben, möchten Sie vielleicht dennoch das Schema der geschriebenen Daten überschreiben. Sie ersetzen das Schema und die Partitionierung der Tabelle, indem Sie die Option overwriteSchema auf true festlegen:

df.write.option("overwriteSchema", "true")

Wichtig

Sie können overwriteSchema nicht als true angeben, wenn Sie die dynamische Partitionsüberschreibung verwenden.