Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento kurz ukazuje běžné operace tabulek Delta pomocí ukázkových dat. Delta Lake je optimalizovaná vrstva úložiště, která poskytuje základ pro tabulky v Databricks. Pokud není uvedeno jinak, všechny tabulky v Databricks jsou tabulky Delta.
Než začnete
K dokončení tohoto kurzu potřebujete:
- Oprávnění k použití existujícího výpočetního prostředku nebo vytvoření nového výpočetního prostředku Viz Výpočty.
- Oprávnění katalogu Unity:
USE CATALOG,USE SCHEMA, aCREATE TABLEv kataloguworkspace. Pokud chcete tato oprávnění nastavit, obraťte se na správce Databricks nebo oprávnění katalogu Unity a zabezpečitelné položky.
Tyto příklady využívají datovou sadu s názvem Syntetické záznamy osob: 10 tisíc až 10 milionů záznamů. Tato datová sada obsahuje fiktivní záznamy lidí, včetně jejich křestního jména a příjmení, pohlaví a věku.
Nejprve si stáhněte datovou sadu pro tento kurz.
- Navštivte stránku Syntetické záznamy osob: 10K až 10M záznamů na Kaggle.
- Klepněte na tlačítko Stáhnout a potom stáhnout datovou sadu jako zip. Tím se stáhne soubor s názvem
archive.zipna místní počítač. -
archiveExtrahujte složku zearchive.zipsouboru.
Dále nahrajte person_10000.csv datovou sadu do svazku katalogu Unity v rámci pracovního prostoru Azure Databricks. Azure Databricks doporučuje nahrát data do svazku katalogu Unity, protože svazky poskytují možnosti pro přístup k souborům, jejich ukládání, řízení a uspořádání souborů.
- Kliknutím na
otevřete Průzkumníka katalogu.Katalog na bočním panelu
- V Průzkumníku katalogu klikněte na
Přidat data a Vytvořit svazek. - Pojmenujte svazek
my-volumea jako typ svazku vyberte Spravovaný svazek . -
workspaceVyberte katalog adefaultschéma a potom klepněte na tlačítko Vytvořit. - Otevřete
my-volumea klikněte na Nahrát na tento svazek. - Přetáhněte nebo přejděte do složky na místním počítači a vyberte
person_10000.csvsoubor zearchivesložky. - Klikněte na tlačítko Odeslat.
Nakonec vytvořte poznámkový blok pro spuštění ukázkového kódu.
- Na bočním panelu klikněte na
a poté na možnost Nový. - Klikněte na
Poznámkový blok pro vytvoření nového poznámkového bloku
- Zvolte jazyk poznámkového bloku.
Vytvoření tabulky
Vytvořte novou tabulku spravovanou katalogem Unity s názvem workspace.default.people_10k z person_10000.csv. Delta Lake je výchozí nastavení pro všechny příkazy pro vytváření, čtení a zápis tabulek v Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
SQL
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
Tabulky můžete vytvářet nebo klonovat několika různými způsoby. Další informace najdete v tématu CREATE TABLE.
Ve službě Databricks Runtime 13.3 LTS a vyšší můžete vytvořit CREATE TABLE LIKE novou prázdnou tabulku Delta, která duplikuje vlastnosti schématu a tabulky zdrojové tabulky Delta. To může být užitečné při propagaci tabulek z vývojového prostředí do produkčního prostředí.
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
Důležité
Tato funkce je ve verzi Public Preview.
DeltaTableBuilder K vytvoření prázdné tabulky použijte rozhraní API pro Python a Scala.
DataFrameWriter a DataFrameWriterV2 rozhraní v porovnání s rozhraním DeltaTableBuilder API usnadňuje zadávání dalších informací, jako jsou komentáře ke sloupcům, vlastnosti tabulky a generované sloupce.
Python
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
Scala
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
Upsert do tabulky
Upravte existující záznamy v tabulce nebo přidejte nové pomocí operace s názvem upsert. Pokud chcete sloučit sadu aktualizací a vložení do existující tabulky Delta, použijte metodu DeltaTable.merge v Pythonu a Scala a MERGE INTO příkaz v SQL.
Například sloučit data ze zdrojové tabulky people_10k_updates do cílové tabulky workspace.default.people_10kDelta . Pokud v obou tabulkách existuje odpovídající řádek, Delta Lake aktualizuje sloupec dat pomocí daného výrazu. Pokud neexistuje žádný odpovídající řádek, Delta Lake přidá nový řádek.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
Scala
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
SQL
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
Operátor v * SQL aktualizuje nebo vloží všechny sloupce v cílové tabulce za předpokladu, že zdrojová tabulka má stejné sloupce jako cílová tabulka. Pokud cílová tabulka nemá stejné sloupce, dotaz vyvolá chybu analýzy. Při provádění operace vložení musíte také zadat hodnotu pro každý sloupec v tabulce. Hodnoty sloupců mohou být například ''prázdné . Při provádění operace vložení nemusíte aktualizovat všechny hodnoty.
Čtení tabulky
Pro přístup k datům v tabulkách Delta použijte název tabulky nebo cestu. Pro přístup ke spravovaným tabulkám Katalogu Unity použijte plně kvalifikovaný název tabulky. Přístup založený na cestě se podporuje jenom u svazků a externích tabulek, ne pro spravované tabulky. Další informace najdete v tématu Pravidla cesty a přístup ke svazkům katalogu Unity.
Python
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
Scala
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SQL
SELECT * FROM workspace.default.people_10k;
Zápis do tabulky
Delta Lake používá standardní syntaxi pro zápis dat do tabulek. Pokud chcete přidat nová data do existující tabulky Delta, použijte režim připojení. Na rozdíl od upsertování zápis do tabulky nekontroluje duplicitní záznamy.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
Scala
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
SQL
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
Výstupy buněk poznámkového bloku Databricks zobrazují maximálně 10 000 řádků nebo 2 MB podle toho, co je nižší. Protože workspace.default.people_10k obsahuje více než 10 000 řádků, zobrazí se ve výstupu display(df)poznámkového bloku pouze prvních 10 000 řádků . Další řádky jsou přítomné v tabulce, ale nejsou vykresleny ve výstupu poznámkového bloku kvůli tomuto limitu. Další řádky můžete zobrazit tak, že je konkrétně vyfiltrujete.
Pokud chcete nahradit všechna data v tabulce, použijte režim přepsání.
Python
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
Scala
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
SQL
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
Aktualizace tabulky
Aktualizujte data v tabulce Delta na základě predikátu. Změňte například hodnoty ve sloupci gender z Female na F, z Male na M a z Other na O.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Odstranit z tabulky
Odeberte data, která odpovídají predikátu z tabulky Delta. Následující kód například ukazuje dvě operace odstranění: nejprve odstraňte řádky, ve kterých je věk menší než 18, a pak odstraňte řádky, ve kterých je věk menší než 21.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Důležité
Odstranění odebere data z nejnovější verze tabulky Delta, ale neodebere je z fyzického úložiště, dokud se staré verze explicitně nevysadí. Další informace najdete v tématu vakuum.
Zobrazení historie tabulek
DeltaTable.history Pomocí metody v Pythonu a Scala a příkazu DESCRIBE HISTORY v SQL zobrazte informace o provenience pro každý zápis do tabulky.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
SQL
DESCRIBE HISTORY workspace.default.people_10k
Dotazování starší verze tabulky pomocí časového cestování
Dotazujte se na starší snímek Delta tabulky pomocí časového cestování v Delta Lake. Pokud chcete zadat dotaz na konkrétní verzi, použijte číslo verze tabulky nebo časové razítko. Například verze 0 dotazu nebo časové razítko 2026-01-05T23:09:47.000+00:00 z historie tabulky.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
SQL
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
U časových razítek se přijímají pouze řetězce data nebo časového razítka. Například řetězce musí být formátovány jako "2026-01-05T22:43:15.000+00:00" nebo "2026-01-05 22:43:15".
Využijte možností DataFrameReader k vytvoření datového rámce z Delta tabulky, která je určena na určitou verzi nebo časové razítko tabulky.
Python
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
Scala
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
SQL
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
Další informace najdete v tématu Práce s historií tabulek.
Optimalizace tabulky
Několik změn v tabulce může vytvořit několik malých souborů, což zpomaluje výkon dotazů na čtení. Pomocí operace optimalizace můžete zrychlit kombinováním malých souborů do větších souborů. Viz OPTIMIZE.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE workspace.default.people_10k
Poznámka:
Pokud je povolená prediktivní optimalizace, nemusíte ji optimalizovat ručně. Prediktivní optimalizace automaticky spravuje úlohy údržby. Další informace najdete v tématu Prediktivní optimalizace spravovaných tabulek v katalogu Unity.
pořadí Z podle sloupců
Pokud chcete data v Z-řádu a dále zlepšit výkon čtení, zadejte sloupce, podle kterých se mají data uspořádat. Například seskupte podle sloupce firstName s vysokou kardinalitou. Další informace o řazení z najdete v tématu Vynechání dat.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
SQL
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
Vyčištění snímků pomocí operace vakua
Delta Lake má izolaci snímku při čtení, což znamená, že je bezpečné spustit operaci optimalizace, zatímco ostatní uživatelé nebo procesy provádějí dotazy na tabulku. Nakonec byste ale měli staré snímky vyčistit, protože tím snížíte náklady na úložiště, zvýšíte výkon dotazů a zajistíte dodržování předpisů u dat. Spuštěním VACUUM operace vyčistíte staré snímky. Viz VACUUM.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
SQL
VACUUM workspace.default.people_10k
Další informace o efektivním používání operace vakua naleznete v tématu Odebrání nepoužívaných datových souborů s vakuem.