Dela via


Självstudie: Skapa och hantera Delta Lake-tabeller

Den här handledningen visar vanliga Delta-tabelloperationer med hjälp av exempeldata. Delta Lake är det optimerade lagringslagret som utgör grunden för tabeller i Databricks. Om inget annat anges är alla tabeller i Databricks Delta-tabeller.

Innan du börjar

För att slutföra denna handledning behöver du:

  • Behörighet att använda en befintlig beräkningsresurs eller skapa en ny beräkningsresurs. Se Beräkning.
  • Behörigheter för Unity-katalogen: USE CATALOG, USE SCHEMAoch CREATE TABLE i workspace katalogen. För att ange dessa behörigheter, kontakta din Databricks-administratör eller se Unity Catalog-behörigheter och skyddbara objekt.

De här exemplen förlitar sig på en datauppsättning med namnet Syntetiska personregister: 10 000 till 10 miljoner poster. Den här datamängden innehåller fiktiva poster över personer, inklusive deras för- och efternamn, kön och ålder.

Ladda först ned datauppsättningen för den här självstudien.

  1. Besök sidan Syntetiska personuppgifter: 10 000 till 10 miljoner poster på Kaggle.
  2. Klicka på Ladda ned och sedan på Ladda ned datauppsättning som zip. Detta laddar ned en fil med namnet archive.zip till den lokala datorn.
  3. archive Extrahera mappen från archive.zip filen.

Ladda sedan upp datamängden person_10000.csv till en Unity Catalog-volym på din Azure Databricks-arbetsyta. Azure Databricks rekommenderar att du laddar upp dina data till en Unity Catalog-volym eftersom volymer ger funktioner för åtkomst, lagring, styrning och organisering av filer.

  1. Öppna Katalogutforskaren genom att klicka på dataikonen.Katalog i sidofältet.
  2. I Katalogutforskaren klickar du på Lägg till eller plus-ikonenLägg till data och Skapa en volym.
  3. Namnge volymen my-volume och välj Hanterad volym som volymtyp.
  4. workspace Välj katalogen och default schemat och klicka sedan på Skapa.
  5. Öppna my-volume och klicka på Ladda upp till den här volymen.
  6. Dra och släpp eller bläddra till och välj person_10000.csv filen från mappen archive på den lokala datorn.
  7. Klicka på Överför.

Skapa slutligen en notebook-fil för att köra exempelkoden.

  1. Klicka på Lägg till eller plusikonNy i sidopanelen.
  2. Klicka på Notebook-ikonen.Notebook-fil för att skapa en ny notebook-fil.
  3. Välj ett språk för notebook-filen.

Skapa en tabell

Skapa en ny hanterad Unity Catalog-tabell med namnet workspace.default.people_10k från person_10000.csv. Delta Lake är standard för alla kommandon för att skapa, läsa och skriva tabeller i Azure Databricks.

python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

val df = spark.read
  .format("csv")
  .option("header", true)
  .schema(schema)
  .load("/Volumes/workspace/default/my-volume/person_10000.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")

// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)

SQL

-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
  person_id AS id,
  firstname,
  lastname,
  gender,
  age
FROM read_files(
  '/Volumes/workspace/default/my-volume/person_10000.csv',
  format => 'csv',
  header => true
);

-- View the new table.
SELECT * FROM workspace.default.people_10k;

Det finns flera olika sätt att skapa eller klona tabeller. Mer information finns i CREATE TABLE.

I Databricks Runtime 13.3 LTS och senare kan du använda CREATE TABLE LIKE för att skapa en ny tom Delta-tabell som duplicerar schema- och tabellegenskaperna för en Delta-källtabell. Detta kan vara användbart när du befordrar tabeller från en utvecklingsmiljö till produktion.

CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Använd API:et DeltaTableBuilder för Python och Scala för att skapa en tom tabell. DataFrameWriter Jämfört med och DataFrameWriterV2gör API:et DeltaTableBuilder det enklare att ange ytterligare information som kolumnkommenterar, tabellegenskaper och genererade kolumner.

python

from delta.tables import DeltaTable

(
  DeltaTable.createIfNotExists(spark)
    .tableName("workspace.default.people_10k_2")
    .addColumn("id", "INT")
    .addColumn("firstName", "STRING")
    .addColumn("lastName", "STRING", comment="surname")
    .addColumn("gender", "STRING")
    .addColumn("age", "INT")
    .execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

Scala

import io.delta.tables.DeltaTable

DeltaTable.createOrReplace(spark)
  .tableName("workspace.default.people_10k")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build()
  )
  .addColumn("gender", "STRING")
  .addColumn("age", "INT")
  .execute()

display(spark.read.table("workspace.default.people_10k"))

Upsert till en tabell

Ändra befintliga poster i en tabell eller lägg till nya med hjälp av en åtgärd som kallas upsert. Om du vill sammanfoga en uppsättning uppdateringar och infogningar i en befintlig Delta-tabell använder du DeltaTable.merge metoden i Python och Scala och -instruktionen MERGE INTO i SQL.

Du kan till exempel slå samman data från källtabellen people_10k_updates till Delta-måltabellen workspace.default.people_10k. När det finns en matchande rad i båda tabellerna uppdaterar Delta Lake datakolumnen med det angivna uttrycket. När det inte finns någon matchande rad lägger Delta Lake till en ny rad.

python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10001, 'Billy', 'Luppitt', 'M', 55),
  (10002, 'Mary', 'Smith', 'F', 98),
  (10003, 'Elias', 'Leadbetter', 'M', 48),
  (10004, 'Jane', 'Doe', 'F', 30),
  (10005, 'Joshua', '', 'M', 90),
  (10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
  .merge(
    people_10k_updates.alias("people_10k_updates"),
    "people_10k.id = people_10k_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

Scala

import org.apache.spark.sql.types._
import io.delta.tables._

// Define schema
val schema = StructType(Array(
  StructField("id", IntegerType, true),
  StructField("firstName", StringType, true),
  StructField("lastName", StringType, true),
  StructField("gender", StringType, true),
  StructField("age", IntegerType, true)
))

// Create data as Seq of Tuples
val data = Seq(
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16)
)

// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
  "id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

deltaTable.as("people_10k")
  .merge(
    people_10k_updates.as("people_10k_updates"),
    "people_10k.id = people_10k_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)

SQL

-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
  (10001, "Billy", "Luppitt", "M", 55),
  (10002, "Mary", "Smith", "F", 98),
  (10003, "Elias", "Leadbetter", "M", 48),
  (10004, "Jane", "Doe", "F", 30),
  (10005, "Joshua", "", "M", 90),
  (10006, "Ginger", "", "F", 16);

-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *;

-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001

I SQL uppdaterar eller infogar operatorn * alla kolumner i måltabellen, förutsatt att källtabellen har samma kolumner som måltabellen. Om måltabellen inte har samma kolumner utlöser frågan ett analysfel. Du måste också ange ett värde för varje kolumn i tabellen när du utför en infogningsåtgärd. Kolumnvärdena kan vara tomma, till exempel ''. När du utför en infogningsåtgärd behöver du inte uppdatera alla värden.

Läsa en tabell

Använd tabellnamnet eller sökvägen för att komma åt data i Delta-tabeller. Om du vill komma åt hanterade tabeller i Unity Catalog använder du ett fullständigt kvalificerat tabellnamn. Sökvägsbaserad åtkomst stöds endast för volymer och externa tabeller, inte för hanterade tabeller. Mer information finns i Sökvägsregler och åtkomst i Unity Catalog-volymer.

python

people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

Scala

val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

SQL

SELECT * FROM workspace.default.people_10k;

Skriva till en tabell

Delta Lake använder standardsyntaxen för att skriva data till tabeller. Om du vill lägga till nya data i en befintlig Delta-tabell använder du tilläggsläget. Till skillnad från upserting söker du inte efter dubblettposter när du skriver till en tabell.

python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True)
])

data = [
  (10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df  = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

Scala

// Create the new data.
val data = Seq(
  (10007, "Miku", "Hatsune", "F", 25)
)

val df = spark.createDataFrame(data)
  .toDF("id", "firstName", "lastName", "gender", "age")

// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")

// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)

SQL

CREATE OR REPLACE TABLE workspace.default.people_10k_new (
  id INT,
  firstName STRING,
  lastName STRING,
  gender STRING,
  age INT
);

-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
  (10007, 'Miku', 'Hatsune', 'F', 25);

-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;

-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;

Databricks notebook-cellutgångar visar högst 10 000 rader eller 2 MB, beroende på vilket som är minst. Eftersom workspace.default.people_10k innehåller mer än 10 000 rader visas endast de första 10 000 raderna i notebook-utdata för display(df). De ytterligare raderna finns i tabellen, men återges inte i notebook-utdata på grund av den här gränsen. Du kan visa de ytterligare raderna genom att specifikt filtrera efter dem.

Om du vill ersätta alla data i en tabell använder du överskrivningsläget.

python

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

Scala

df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

SQL

INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2

Uppdatera en tabell

Uppdatera data i en Delta-tabell baserat på ett predikat. Du kan till exempel ändra värdena i gender kolumnen från Female till F, från Male till Moch från Other till O.

python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'Female'",
  set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'Male',
  set = { 'gender': lit('M') }
)

deltaTable.update(
  condition = col('gender') == 'Other',
  set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'Female'",
  Map("gender" -> "'F'")
)

// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
  col("gender") === "Male",
  Map("gender" -> lit("M")));

deltaTable.update(
  col("gender") === "Other",
  Map("gender" -> lit("O")));

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Ta bort från en tabell

Ta bort data som matchar ett predikat från en Delta-tabell. Koden nedan visar till exempel två borttagningsåtgärder: först tar du bort rader där åldern är mindre än 18 och tar sedan bort rader där åldern är mindre än 21.

python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")

// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)

SQL

-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';

-- View the updated table.
SELECT * FROM workspace.default.people_10k;

Viktigt!

Borttagning tar bort data från den senaste versionen av Delta-tabellen, men tar inte bort dem från fysisk lagring förrän de gamla versionerna uttryckligen har rensats. Mer information finns i vakuum.

Visa tabellhistorik

DeltaTable.history Använd metoden i Python och Scala och -instruktionen DESCRIBE HISTORY i SQL för att visa proveniensinformationen för varje skrivning till en tabell.

python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

SQL

DESCRIBE HISTORY workspace.default.people_10k

Fråga en tidigare version av tabellen med hjälp av tidsresor

Fråga en äldre ögonblicksbild av en Delta-tabell med hjälp av Delta Lake-tidsresor. Om du vill köra frågor mot en viss version använder du tabellens versionsnummer eller tidsstämpel. Till exempel frågeversion 0 eller tidsstämpel 2026-01-05T23:09:47.000+00:00 från tabellens historik.

python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()

// Query using the version number.
display(deltaHistory.where("version == 0"))

// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

SQL

-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

För tidsstämplar godkänns endast datum- eller tidsstämpelsträngar. Strängar måste till exempel formateras som "2026-01-05T22:43:15.000+00:00" eller "2026-01-05 22:43:15".

Använd DataFrameReader alternativen för att skapa en DataFrame från en Delta-tabell som är fixerad till en specifik version eller tidsstämpel av tabellen.

python

# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

Scala

// Query using the version number.
val dfVersion = spark.read
  .option("versionAsOf", 0)
  .table("workspace.default.people_10k")

// Query using the timestamp.
val dfTimestamp = spark.read
  .option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
  .table("workspace.default.people_10k")

display(dfVersion)
display(dfTimestamp)

SQL

-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;

-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';

SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;

Mer information finns i Arbeta med tabellhistorik.

Optimera en tabell

Flera ändringar i en tabell kan skapa flera små filer, vilket gör att läsfrågans prestanda blir långsammare. Använd optimeringsåtgärden för att förbättra hastigheten genom att kombinera små filer till större filer. Se även OPTIMIZE.

python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE workspace.default.people_10k

Kommentar

Om förutsägande optimering är aktiverat behöver du inte optimera manuellt. Förutsägande optimering hanterar automatiskt underhållsaktiviteter. Mer information finns i Förutsägande optimering för hanterade Unity Catalog-tabeller.

Z-ordning efter kolumner

För att z-ordna data och ytterligare förbättra läsprestanda, ange de kolumner som ska ordnas i operationen. Du kan till exempel sortera efter kolumnen firstNamemed hög kardinalitet . Mer information om z-ordning finns i Data skipping.

python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

SQL

OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)

Rensa ögonblicksbilder med vakuumåtgärden

Delta Lake har ögonblicksbildsisolering vid läsningar, vilket innebär att det är säkert att köra en optimeringsåtgärd medan andra användare eller jobb frågar tabellen. Du bör dock så småningom rensa gamla ögonblicksbilder eftersom det minskar lagringskostnaderna, förbättrar frågeprestanda och säkerställer dataefterlevnad. Kör åtgärden VACUUM för att rensa gamla ögonblicksbilder. Se även VACUUM.

python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

SQL

VACUUM workspace.default.people_10k

Mer information om hur du använder vakuumåtgärden effektivt finns i Ta bort oanvända datafiler med vakuum.

Nästa steg