Sdílet prostřednictvím


Kurz: Vytváření a správa tabulek Delta Lake

Tento kurz ukazuje běžné operace tabulek Delta pomocí ukázkových dat. Delta Lake je optimalizovaná vrstva úložiště, která poskytuje základ pro tabulky v Databricks. Pokud není uvedeno jinak, všechny tabulky v Databricks jsou tabulky Delta.

Než začnete

K dokončení tohoto kurzu potřebujete:

  • Oprávnění k použití existujícího výpočetního prostředku nebo vytvoření nového výpočetního prostředku Viz Výpočty.
  • Oprávnění katalogu Unity: USE CATALOG, USE SCHEMA, a CREATE TABLE v katalogu workspace. Pokud chcete tato oprávnění nastavit, obraťte se na správce Databricks nebo oprávnění katalogu Unity a zabezpečitelné položky.

Tyto příklady využívají datovou sadu s názvem Syntetické záznamy osob: 10 tisíc až 10 milionů záznamů. Tato datová sada obsahuje fiktivní záznamy lidí, včetně jejich křestního jména a příjmení, pohlaví a věku.

Nejprve si stáhněte datovou sadu pro tento kurz.

  1. Navštivte stránku Syntetické záznamy osob: 10K až 10M záznamů na Kaggle.
  2. Klepněte na tlačítko Stáhnout a potom stáhnout datovou sadu jako zip. Tím se stáhne soubor s názvem archive.zip na místní počítač.
  3. archive Extrahujte složku ze archive.zip souboru.

Dále nahrajte person_10000.csv datovou sadu do svazku katalogu Unity v rámci pracovního prostoru Azure Databricks. Azure Databricks doporučuje nahrát data do svazku katalogu Unity, protože svazky poskytují možnosti pro přístup k souborům, jejich ukládání, řízení a uspořádání souborů.

  1. Kliknutím na ikonu Data otevřete Průzkumníka katalogu.Katalog na bočním panelu
  2. V Průzkumníku katalogu klikněte na Přidat nebo plus ikonuPřidat data a Vytvořit svazek.
  3. Pojmenujte svazek my-volume a jako typ svazku vyberte Spravovaný svazek .
  4. workspace Vyberte katalog a default schéma a potom klepněte na tlačítko Vytvořit.
  5. Otevřete my-volume a klikněte na Nahrát na tento svazek.
  6. Přetáhněte nebo přejděte do složky na místním počítači a vyberte person_10000.csv soubor ze archive složky.
  7. Klikněte na tlačítko Odeslat.

Nakonec vytvořte poznámkový blok pro spuštění ukázkového kódu.

  1. Na bočním panelu klikněte na ikonu Přidat nebo plus a poté na možnost Nový.
  2. Klikněte na ikonu Poznámkový blok.Poznámkový blok pro vytvoření nového poznámkového bloku
  3. Zvolte jazyk poznámkového bloku.

Vytvoření tabulky

Vytvořte novou tabulku spravovanou katalogem Unity s názvem workspace.default.people_10k z person_10000.csv. Delta Lake je výchozí nastavení pro všechny příkazy pro vytváření, čtení a zápis tabulek v Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

val df = spark.read
  .format("csv")
  .option("header", true)
  .schema(schema)
  .load("/Volumes/workspace/default/my-volume/person_10000.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")

// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)

SQL

-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
  person_id AS id,
  firstname,
  lastname,
  gender,
  age
FROM read_files(
  '/Volumes/workspace/default/my-volume/person_10000.csv',
  format => 'csv',
  header => true
);

-- View the new table.
SELECT * FROM workspace.default.people_10k;

Tabulky můžete vytvářet nebo klonovat několika různými způsoby. Další informace najdete v tématu CREATE TABLE.

Ve službě Databricks Runtime 13.3 LTS a vyšší můžete vytvořit CREATE TABLE LIKE novou prázdnou tabulku Delta, která duplikuje vlastnosti schématu a tabulky zdrojové tabulky Delta. To může být užitečné při propagaci tabulek z vývojového prostředí do produkčního prostředí.

CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k

Důležité

Tato funkce je ve verzi Public Preview.

DeltaTableBuilder K vytvoření prázdné tabulky použijte rozhraní API pro Python a Scala. DataFrameWriter a DataFrameWriterV2 rozhraní v porovnání s rozhraním DeltaTableBuilder API usnadňuje zadávání dalších informací, jako jsou komentáře ke sloupcům, vlastnosti tabulky a generované sloupce.

Python

from delta.tables import DeltaTable

(
  DeltaTable.createIfNotExists(spark)
    .tableName("workspace.default.people_10k_2")
    .addColumn("id", "INT")
    .addColumn("firstName", "STRING")
    .addColumn("lastName", "STRING", comment="surname")
    .addColumn("gender", "STRING")
    .addColumn("age", "INT")
    .execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

Scala

import io.delta.tables.DeltaTable

DeltaTable.createOrReplace(spark)
  .tableName("workspace.default.people_10k")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build()
  )
  .addColumn("gender", "STRING")
  .addColumn("age", "INT")
  .execute()

display(spark.read.table("workspace.default.people_10k"))

Upsert do tabulky

Upravte existující záznamy v tabulce nebo přidejte nové pomocí operace s názvem upsert. Pokud chcete sloučit sadu aktualizací a vložení do existující tabulky Delta, použijte metodu DeltaTable.merge v Pythonu a Scala a MERGE INTO příkaz v SQL.

Například sloučit data ze zdrojové tabulky people_10k_updates do cílové tabulky workspace.default.people_10kDelta . Pokud v obou tabulkách existuje odpovídající řádek, Delta Lake aktualizuje sloupec dat pomocí daného výrazu. Pokud neexistuje žádný odpovídající řádek, Delta Lake přidá nový řádek.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10001, 'Billy', 'Luppitt', 'M', 55),
  (10002, 'Mary', 'Smith', 'F', 98),
  (10003, 'Elias', 'Leadbetter', 'M', 48),
  (10004, 'Jane', 'Doe', 'F', 30),
  (10005, 'Joshua', '', 'M', 90),
  (10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
  .merge(
    people_10k_updates.alias("people_10k_updates"),
    "people_10k.id = people_10k_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

Scala

import org.apache.spark.sql.types._
import io.delta.tables._

// Define schema
val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

// Create data as Seq of Tuples
val data = Seq(
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16)
)

// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
  "id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

deltaTable.as("people_10k")
  .merge(
    people_10k_updates.as("people_10k_updates"),
    "people_10k.id = people_10k_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)

SQL

-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16);

-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *;

-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001

Operátor v * SQL aktualizuje nebo vloží všechny sloupce v cílové tabulce za předpokladu, že zdrojová tabulka má stejné sloupce jako cílová tabulka. Pokud cílová tabulka nemá stejné sloupce, dotaz vyvolá chybu analýzy. Při provádění operace vložení musíte také zadat hodnotu pro každý sloupec v tabulce. Hodnoty sloupců mohou být například ''prázdné . Při provádění operace vložení nemusíte aktualizovat všechny hodnoty.

Čtení tabulky

Pro přístup k datům v tabulkách Delta použijte název tabulky nebo cestu. Pro přístup ke spravovaným tabulkám Katalogu Unity použijte plně kvalifikovaný název tabulky. Přístup založený na cestě se podporuje jenom u svazků a externích tabulek, ne pro spravované tabulky. Další informace najdete v tématu Pravidla cesty a přístup ke svazkům katalogu Unity.

Python

people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

Scala

val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

SQL

SELECT * FROM workspace.default.people_10k;

Zápis do tabulky

Delta Lake používá standardní syntaxi pro zápis dat do tabulek. Pokud chcete přidat nová data do existující tabulky Delta, použijte režim připojení. Na rozdíl od upsertování zápis do tabulky nekontroluje duplicitní záznamy.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df  = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

Scala

// Create the new data.
val data = Seq(
  (10007, "Miku", "Hatsune", "F", 25)
)

val df = spark.createDataFrame(data)
  .toDF("id", "firstName", "lastName", "gender", "age")

// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")

// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)

SQL

CREATE OR REPLACE TABLE workspace.default.people_10k_new (
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);

-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
  (10007, 'Miku', 'Hatsune', 'F', 25);

-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;

-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;

Výstupy buněk poznámkového bloku Databricks zobrazují maximálně 10 000 řádků nebo 2 MB podle toho, co je nižší. Protože workspace.default.people_10k obsahuje více než 10 000 řádků, zobrazí se ve výstupu display(df)poznámkového bloku pouze prvních 10 000 řádků . Další řádky jsou přítomné v tabulce, ale nejsou vykresleny ve výstupu poznámkového bloku kvůli tomuto limitu. Další řádky můžete zobrazit tak, že je konkrétně vyfiltrujete.

Pokud chcete nahradit všechna data v tabulce, použijte režim přepsání.

Python

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

Scala

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

SQL

INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2

Aktualizace tabulky

Aktualizujte data v tabulce Delta na základě predikátu. Změňte například hodnoty ve sloupci gender z Female na F, z Male na M a z Other na O.

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'Female'",
  set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'Male',
  set = { 'gender': lit('M') }
)

deltaTable.update(
  condition = col('gender') == 'Other',
  set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'Female'",
  Map("gender" -> "'F'")
)

// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  col("gender") === "Male",
  Map("gender" -> lit("M")));

deltaTable.update(
  col("gender") === "Other",
  Map("gender" -> lit("O")));

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Odstranit z tabulky

Odeberte data, která odpovídají predikátu z tabulky Delta. Následující kód například ukazuje dvě operace odstranění: nejprve odstraňte řádky, ve kterých je věk menší než 18, a pak odstraňte řádky, ve kterých je věk menší než 21.

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Důležité

Odstranění odebere data z nejnovější verze tabulky Delta, ale neodebere je z fyzického úložiště, dokud se staré verze explicitně nevysadí. Další informace najdete v tématu vakuum.

Zobrazení historie tabulek

DeltaTable.history Pomocí metody v Pythonu a Scala a příkazu DESCRIBE HISTORY v SQL zobrazte informace o provenience pro každý zápis do tabulky.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

SQL

DESCRIBE HISTORY workspace.default.people_10k

Dotazování starší verze tabulky pomocí časového cestování

Dotazujte se na starší snímek Delta tabulky pomocí časového cestování v Delta Lake. Pokud chcete zadat dotaz na konkrétní verzi, použijte číslo verze tabulky nebo časové razítko. Například verze 0 dotazu nebo časové razítko 2026-01-05T23:09:47.000+00:00 z historie tabulky.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()

// Query using the version number.
display(deltaHistory.where("version == 0"))

// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

SQL

-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

U časových razítek se přijímají pouze řetězce data nebo časového razítka. Například řetězce musí být formátovány jako "2026-01-05T22:43:15.000+00:00" nebo "2026-01-05 22:43:15".

Využijte možností DataFrameReader k vytvoření datového rámce z Delta tabulky, která je určena na určitou verzi nebo časové razítko tabulky.

Python

# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

Scala

// Query using the version number.
val dfVersion = spark.read
  .option("versionAsOf", 0)
  .table("workspace.default.people_10k")

// Query using the timestamp.
val dfTimestamp = spark.read
  .option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
  .table("workspace.default.people_10k")

display(dfVersion)
display(dfTimestamp)

SQL

-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;

Další informace najdete v tématu Práce s historií tabulek.

Optimalizace tabulky

Několik změn v tabulce může vytvořit několik malých souborů, což zpomaluje výkon dotazů na čtení. Pomocí operace optimalizace můžete zrychlit kombinováním malých souborů do větších souborů. Viz OPTIMIZE.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE workspace.default.people_10k

Poznámka:

Pokud je povolená prediktivní optimalizace, nemusíte ji optimalizovat ručně. Prediktivní optimalizace automaticky spravuje úlohy údržby. Další informace najdete v tématu Prediktivní optimalizace spravovaných tabulek v katalogu Unity.

pořadí Z podle sloupců

Pokud chcete data v Z-řádu a dále zlepšit výkon čtení, zadejte sloupce, podle kterých se mají data uspořádat. Například seskupte podle sloupce firstName s vysokou kardinalitou. Další informace o řazení z najdete v tématu Vynechání dat.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

SQL

OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)

Vyčištění snímků pomocí operace vakua

Delta Lake má izolaci snímku při čtení, což znamená, že je bezpečné spustit operaci optimalizace, zatímco ostatní uživatelé nebo procesy provádějí dotazy na tabulku. Nakonec byste ale měli staré snímky vyčistit, protože tím snížíte náklady na úložiště, zvýšíte výkon dotazů a zajistíte dodržování předpisů u dat. Spuštěním VACUUM operace vyčistíte staré snímky. Viz VACUUM.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

SQL

VACUUM workspace.default.people_10k

Další informace o efektivním používání operace vakua naleznete v tématu Odebrání nepoužívaných datových souborů s vakuem.

Další kroky