Delta Lake-táblaséma frissítése
A Delta Lake lehetővé teszi egy tábla sémájának frissítését. A következő típusú módosítások támogatottak:
- Új oszlopok hozzáadása (tetszőleges pozícióban)
- Meglévő oszlopok átrendezése
- Meglévő oszlopok átnevezése
Ezeket a módosításokat explicit módon DDL használatával vagy implicit módon DML használatával is elvégezheti.
Fontos
A Delta-táblaséma frissítése olyan művelet, amely ütközik az összes egyidejű Delta írási művelettel.
Amikor frissíti egy Delta-tábla sémáját, megszűnnek az adott táblát olvasó streamek. Ha azt szeretné, hogy a stream folytatódjon, újra kell indítania. Az ajánlott módszerekért tekintse meg a strukturált streamelés éles környezettel kapcsolatos szempontjait.
Séma explicit frissítése oszlopok hozzáadásához
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Alapértelmezés szerint a nullabilitás a következő true
: .
Ha oszlopot szeretne hozzáadni egy beágyazott mezőhöz, használja a következőt:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Ha például a séma futtatása ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
előtt a következő:
- root
| - colA
| - colB
| +-field1
| +-field2
a séma a következő után következik:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Feljegyzés
Beágyazott oszlopok hozzáadása csak a szerkezetek esetében támogatott. A tömbök és a térképek nem támogatottak.
Explicit módon frissítse a sémát az oszlop megjegyzésének vagy sorrendjének módosításához
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Beágyazott mező oszlopának módosításához használja a következőt:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Ha például a séma futtatása ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
előtt a következő:
- root
| - colA
| - colB
| +-field1
| +-field2
a séma a következő után következik:
- root
| - colA
| - colB
| +-field2
| +-field1
Explicit módon frissítse a sémát az oszlopok cseréjéhez
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Például a következő DDL futtatásakor:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
ha az előző séma a következő:
- root
| - colA
| - colB
| +-field1
| +-field2
a séma a következő után következik:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Séma explicit frissítése oszlopok átnevezéséhez
Feljegyzés
Ez a funkció a Databricks Runtime 10.4 LTS és újabb verziókban érhető el.
Az oszlopok átnevezéséhez az oszlopok meglévő adatainak újraírása nélkül engedélyeznie kell az oszlopleképezést a táblához. Lásd: Oszlopok átnevezése és elvetése Delta Lake-oszlopleképezéssel.
Oszlop átnevezése:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Beágyazott mező átnevezése:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Ha például a következő parancsot futtatja:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Ha az előző séma a következő:
- root
| - colA
| - colB
| +-field1
| +-field2
Ezután a séma a következő:
- root
| - colA
| - colB
| +-field001
| +-field2
Lásd: Oszlopok átnevezése és elvetése Delta Lake-oszlopleképezéssel.
Explicit módon frissítse a sémát az oszlopok elvetésére
Feljegyzés
Ez a funkció a Databricks Runtime 11.3 LTS és újabb verziókban érhető el.
Ha csak metaadat-műveletként szeretné elvetni az oszlopokat adatfájlok újraírása nélkül, engedélyeznie kell a tábla oszlopleképezését. Lásd: Oszlopok átnevezése és elvetése Delta Lake-oszlopleképezéssel.
Fontos
Az oszlop metaadatokból való elvetése nem törli a fájlokban lévő oszlop alapjául szolgáló adatokat. Az elvetett oszlopadatok törléséhez a REORG TABLE használatával átírhatja a fájlokat. Ezután a VACUUM használatával fizikailag törölheti az elvetett oszlopadatokat tartalmazó fájlokat.
Oszlop elvetése:
ALTER TABLE table_name DROP COLUMN col_name
Több oszlop elvetése:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Explicit módon frissítse a sémát az oszloptípus vagy -név módosításához
Módosíthatja egy oszlop típusát vagy nevét, vagy elvethet egy oszlopot a tábla újraírásával. Ehhez használja a overwriteSchema
lehetőséget.
Az alábbi példa egy oszloptípus módosítását mutatja be:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Az alábbi példa egy oszlopnév módosítását mutatja be:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Sémafejlődés engedélyezése
A sémafejlődést az alábbi műveletek egyikével engedélyezheti:
- Állítsa be a
.option("mergeSchema", "true")
Spark DataFrame-etwrite
vagywriteStream
-műveletet. Lásd: Az írások sémafejlődésének engedélyezése új oszlopok hozzáadásához. - Szintaxis használata
MERGE WITH SCHEMA EVOLUTION
. Az egyesítéshez lásd a sémafejlődés szintaxisát. - Állítsa be a Spark conf-t
spark.databricks.delta.schema.autoMerge.enabled
true
az aktuális SparkSession értékre.
A Databricks azt javasolja, hogy a Spark-conf beállítása helyett engedélyezze az egyes írási műveletek sémafejlődését.
Ha beállításokat vagy szintaxist használ a sémafejlődés engedélyezéséhez egy írási műveletben, ez elsőbbséget élvez a Spark-konföderációval szemben.
Feljegyzés
Az utasításokhoz nincs sémafejlődési INSERT INTO
záradék.
Sémafejlődés engedélyezése írásokhoz új oszlopok hozzáadásához
A forrás lekérdezésben található, de a céltáblából hiányzó oszlopok automatikusan hozzáadódnak egy írási tranzakció részeként, ha engedélyezve van a sémafejlődés. Lásd: Sémafejlődés engedélyezése.
A kis- és nagybetűk megmaradnak egy új oszlop hozzáfűzésekor. A rendszer új oszlopokat ad hozzá a táblaséma végéhez. Ha a további oszlopok egy struktúra részét képezik, a rendszer hozzáfűzi őket a céltáblában lévő struktúra végéhez.
Az alábbi példa bemutatja, hogy az automatikus betöltővel használja a mergeSchema
lehetőséget. Lásd : Mi az automatikus betöltő?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Az alábbi példa azt mutatja be, hogy a mergeSchema
lehetőséget kötegírási művelettel használja:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
A Delta Lake-egyesítés automatikus sémafejlődése
A sémafejlődés lehetővé teszi a felhasználók számára a sémaeltérések feloldását a cél és a forrástábla egyesítésével. A következő két esetet kezeli:
- A forrástábla egyik oszlopa nem szerepel a céltáblában. Az új oszlop hozzáadódik a célsémához, és az értékei a forrásértékek használatával lesznek beszúrva vagy frissítve.
- A céltábla egyik oszlopa nem szerepel a forrástáblában. A célséma változatlan marad; a további céloszlop értékei változatlanok maradnak (for
UPDATE
) vagy (for ) értékreNULL
INSERT
vannak állítva.
Manuálisan kell engedélyeznie az automatikus sémafejlődést. Lásd: Sémafejlődés engedélyezése.
Feljegyzés
A Databricks Runtime 12.2 LTS-ben és újabb verziókban a forrástáblában található oszlopok és strukturált mezők név szerint adhatók meg a beszúrási vagy frissítési műveletekben. A Databricks Runtime 11.3 LTS-ben és alatta csak INSERT *
vagy UPDATE SET *
műveletek használhatók a sémafejlődéshez az egyesítéssel.
A Databricks Runtime 13.3 LTS-ben és újabb verziókban sémafejlődést használhat a térképekbe ágyazott szerkezetekkel, például map<int, struct<a: int, b: int>>
.
Sémafejlődési szintaxis az egyesítéshez
A Databricks Runtime 15.2-s és újabb verziójában sql- vagy Delta tábla API-k használatával megadhatja a sémafejlődést egy egyesítési utasításban:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Példaműveletek a sémafejlődéssel való egyesítéshez
Íme néhány példa a sémafejlődéssel és anélkül végzett működés hatásaira merge
.
(1) Ez a viselkedés a Databricks Runtime 12.2 LTS és újabb verziókban érhető el; Ebben a feltételben a Databricks Runtime 11.3 LTS és az alábbi hiba jelenik meg.
Oszlopok kizárása Delta Lake-egyesítéssel
A Databricks Runtime 12.2 LTS-ben és újabb EXCEPT
verziókban az oszlopok explicit kizárásához használhat záradékokat az egyesítési feltételekben. A kulcsszó viselkedése EXCEPT
attól függően változik, hogy engedélyezve van-e a sémafejlődés.
Ha a sémafejlődés le van tiltva, a EXCEPT
kulcsszó a céltáblában lévő oszlopok listájára vonatkozik, és lehetővé teszi az oszlopok kizárását a műveletekből vagy INSERT
műveletekbőlUPDATE
. A kizárt oszlopok értéke .null
Ha a sémafejlődés engedélyezve van, a EXCEPT
kulcsszó a forrástáblában lévő oszlopok listájára vonatkozik, és lehetővé teszi az oszlopok kizárását a sémafejlődésből. A forrás olyan új oszlopa, amely nem szerepel a célban, nem lesz hozzáadva a célsémához, ha szerepel a EXCEPT
záradékban. A célban már meglévő kizárt oszlopok a következőre null
vannak állítva: .
Az alábbi példák ezt a szintaxist mutatják be:
Oszlopok | Lekérdezés (SQL-ben) | Viselkedés sémafejlődés nélkül (alapértelmezett) | Viselkedés sémafejlődéssel |
---|---|---|---|
Céloszlopok: id, title, last_updated Forrásoszlopok: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. Az új sorok beszúrása a következő értékekkel id történik: és title . A kizárt mező last_updated értéke : null . A mező review figyelmen kívül lesz hagyva, mert nem szerepel a célban. |
A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. A séma a mező review hozzáadásához fejlődik. Az új sorokat a program az összes forrásmezővel beszúrja, kivéve last_updated a következőt null : . |
Céloszlopok: id, title, last_updated Forrásoszlopok: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT hibát jelez, mert az oszlop internal_count nem létezik a céltáblában. |
A egyeztetett sorok úgy frissülnek, hogy a last_updated mezőt az aktuális dátumra állítja. A review mező hozzáadódik a céltáblához, de a internal_count mező figyelmen kívül lesz hagyva. A beszúrt új sorok a következőre null vannak last_updated állítva: . |
A sémafrissítések oszlopainak NullType
kezelése
Mivel a Parquet nem támogatja NullType
az oszlopokat, NullType
a Rendszer elveti az oszlopokat a DataFrame-ből a Delta-táblákba való íráskor, de továbbra is a sémában tárolja őket. Ha egy másik adattípus érkezik az oszlophoz, a Delta Lake egyesíti a sémát az új adattípussal. Ha a Delta Lake kap egy NullType
meglévő oszlopot, a régi séma megmarad, és az új oszlop el lesz dobva az írás során.
NullType
a streamelés nem támogatott. Mivel a streamelés használatakor sémákat kell beállítania, ez nagyon ritka lehet. NullType
nem fogadható el olyan összetett típusok esetében is, mint például ArrayType
a .MapType
Táblaséma cseréje
Alapértelmezés szerint a tábla adatainak felülírása nem írja felül a sémát. Ha felülír egy táblázatot mode("overwrite")
anélkül replaceWhere
, akkor is felülírhatja az éppen megírt adatok sémáját. A táblázat sémáját és particionálását a következő beállítással overwriteSchema
true
cserélheti le:
df.write.option("overwriteSchema", "true")
Fontos
A dinamikus partíció felülírása esetén nem adható meg overwriteSchema
true
.