Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве показаны распространенные операции с таблицами Delta с использованием примеров данных. Delta Lake — это оптимизированный уровень хранения, который предоставляет основу для таблиц в Databricks. Если иное не указано, все таблицы в Databricks являются разностными таблицами.
Перед тем как начать
Чтобы завершить работу с этим руководством, вам потребуется:
- Разрешение на использование существующего вычислительного ресурса или создание нового вычислительного ресурса. См. раздел "Вычисления".
- Разрешения каталога Unity:
USE CATALOG,USE SCHEMA, иCREATE TABLEв каталогеworkspace. Чтобы задать эти разрешения, обратитесь за помощью к администратору Databricks или ознакомьтесь с привилегиями и управляемыми объектами каталога Unity.
В этих примерах используется набор данных "Синтетические записи о людях: от 10 тыс. до 10 млн записей". Этот набор данных содержит вымышленные записи людей, включая их имена и фамилии, пол и возраст.
Сначала скачайте набор данных для этого руководства.
- Посетите страницу Записи о синтетических личностях: от 10 тыс. до 10 млн записей на сайте Kaggle.
- Нажмите кнопку "Скачать ", а затем скачайте набор данных в виде ZIP-файла. Файл
archive.zipскачивается на локальный компьютер. - Извлеките папку
archivearchive.zipиз файла.
Затем отправьте набор данных в person_10000.csvтом каталога Unity в рабочей области Azure Databricks. Azure Databricks рекомендует отправлять данные в том каталога Unity, так как тома предоставляют возможности для доступа, хранения, управления и организации файлов.
- Откройте обозреватель каталогов, щелкнув
Каталог на боковой панели.
- В обозревателе каталогов нажмите кнопку
"Добавить данные" и "Создать том". - Назовите том и выберите
my-volumeв качестве типа тома. -
workspaceВыберите каталог и схемуdefault, а затем нажмите кнопку "Создать". - Откройте
my-volumeи нажмите «Загрузить в этот том». - Перетащите или выберите
person_10000.csvфайл изarchiveпапки на локальном компьютере. - Нажмите кнопку Отправить.
Наконец, создайте записную книжку для выполнения примера кода.
- Нажмите
"Новый" на боковой панели. - Щелкните
Записная книжка для создания записной книжки.
- Выберите язык записной книжки.
Создание таблицы
Создайте управляемую таблицу каталога Unity с workspace.default.people_10kименемperson_10000.csv. Delta Lake — это значение по умолчанию для всех команд создания, чтения и записи в Azure Databricks.
Питон
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
язык программирования Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
SQL
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
Существует несколько различных способов создания или клонирования таблиц. Дополнительные сведения см. в разделе CREATE TABLE.
В Databricks Runtime 13.3 LTS и более поздних версиях можно использовать CREATE TABLE LIKE для создания новой пустой таблицы Delta, которая дублирует схему и свойства таблицы исходной таблицы Delta. Это может быть полезно при продвижении таблиц из среды разработки в рабочую среду.
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
DeltaTableBuilder Используйте API для Python и Scala, чтобы создать пустую таблицу. По сравнению с DataFrameWriter и DataFrameWriterV2, API DeltaTableBuilder упрощает указание дополнительных сведений, таких как примечания столбцов, свойства таблицы и созданные столбцы.
Питон
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
язык программирования Scala
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
Upsert в таблицу
Измените существующие записи в таблице или добавьте новые, используя операцию upsert. Чтобы объединить набор обновлений и вставок в существующую таблицу Delta, используйте DeltaTable.merge метод в Python и Scala и инструкцию MERGE INTO в SQL.
Например, слияние данных из исходной таблицы people_10k_updates с целевой таблицей workspace.default.people_10kDelta. Если в обеих таблицах есть совпадающая строка, Delta Lake обновляет столбец данных с использованием заданного выражения. Если совпадающая строка отсутствует, Delta Lake добавляет новую строку.
Питон
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
язык программирования Scala
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
SQL
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
В SQL * оператор обновляет или вставляет все столбцы в целевую таблицу, если исходная таблица имеет те же столбцы, что и целевая таблица. Если целевая таблица не имеет одинаковых столбцов, запрос выдает ошибку анализа. Кроме того, при выполнении операции вставки необходимо указать значение для каждого столбца в таблице. Значения столбцов могут быть пустыми, например ''. При выполнении операции вставки не требуется обновлять все значения.
Чтение из таблицы
Используйте имя таблицы или путь для доступа к данным в таблицах Delta. Чтобы получить доступ к управляемым таблицам каталога Unity, используйте полное имя таблицы. Доступ на основе пути поддерживается только для томов и внешних таблиц, а не для управляемых таблиц. Дополнительные сведения в разделе "Правила маршрутизации и доступ в Unity Catalog".
Питон
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
язык программирования Scala
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SQL
SELECT * FROM workspace.default.people_10k;
Запись в таблицу
Delta Lake использует стандартный синтаксис для записи данных в таблицы. Чтобы добавить новые данные в существующую таблицу Delta, используйте режим добавления. В отличие от обновления или вставки, запись в таблицу не проверяет наличие повторяющихся записей.
Питон
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
язык программирования Scala
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
SQL
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
Выходные данные ячеек записной книжки Databricks отображают максимум 10 000 строк или 2 МБ, в зависимости от того, что меньше. Так как workspace.default.people_10k содержит более 10 000 строк, в выходных данных display(df)записной книжки отображаются только первые 10 000 строк. Дополнительные строки присутствуют в таблице, но не отображаются в выходных данных записной книжки из-за этого ограничения. Дополнительные строки можно просмотреть, специально отфильтровав их.
Чтобы заменить все данные в таблице, используйте режим перезаписи.
Питон
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
язык программирования Scala
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
SQL
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
Обновление таблицы
Обновите данные в таблице Delta на основе предиката. Например, измените значения в столбце gender с Female на F, с Male на M, и с Other на O.
Питон
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
язык программирования Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Удаление из таблицы
Удалите данные, соответствующие предикату из таблицы Delta. Например, приведенный ниже код демонстрирует две операции удаления: сначала удаляет строки, в которых возраст меньше 18, а затем удаляет строки, в которых возраст меньше 21.
Питон
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
язык программирования Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Внимание
Удаление удаляет данные из последней версии таблицы Delta, но не удаляет их из физического хранилища до тех пор, пока старые версии не будут явно вакуумированы. Дополнительные сведения см. в разделе "Вакуум".
Отображение журнала таблиц
Используйте метод DeltaTable.history в Python и Scala, а также инструкцию DESCRIBE HISTORY в SQL, чтобы просмотреть информацию о происхождении для каждой операции записи в таблицу.
Питон
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
язык программирования Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
SQL
DESCRIBE HISTORY workspace.default.people_10k
Запрос более ранней версии таблицы с помощью перемещения по времени
Запросите более ранний моментальный снимок таблицы Delta, используя функционал Delta Lake для работы с временными записями. Чтобы запросить определенную версию, используйте номер версии таблицы или метку времени. Например, версия 0 запроса или метка 2026-01-05T23:09:47.000+00:00 времени из журнала таблицы.
Питон
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
язык программирования Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
SQL
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
Для меток времени принимаются только строки даты или метки времени. Например, строки должны быть отформатированы как "2026-01-05T22:43:15.000+00:00" или "2026-01-05 22:43:15".
Используйте DataFrameReader параметры для создания DataFrame из таблицы Delta, зафиксированной на определенной версии или метке времени таблицы.
Питон
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
язык программирования Scala
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
SQL
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
Дополнительные сведения см. в разделе "Работа с журналом таблиц".
Оптимизация таблицы
Несколько изменений в таблице могут создавать несколько небольших файлов, что замедляет производительность запросов чтения. Используйте операцию оптимизации для повышения скорости путем объединения небольших файлов в более крупные. См. OPTIMIZE.
Питон
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
язык программирования Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE workspace.default.people_10k
Примечание.
Если включена прогнозная оптимизация, вам не нужно оптимизировать вручную. Прогнозная оптимизация автоматически управляет задачами обслуживания. Дополнительные сведения см. в разделе "Прогнозная оптимизация" для управляемых таблиц каталога Unity.
Z-порядок по столбцам
Чтобы упорядочить данные в порядке z и повысить производительность чтения, укажите столбцы, по которым будет производиться упорядочивание в операции. Например, сортировка по столбцу firstNameвысокой кратности. Дополнительные сведения о z-упорядочении см. в разделе Пропуск данных.
Питон
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
язык программирования Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
SQL
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
Очистка моментальных снимков с помощью операции вакуума
Delta Lake имеет изоляцию моментальных снимков для операций чтения, что означает, что безопасно выполнять операцию оптимизации, пока другие пользователи или задания запрашивают таблицу. Однако в конце концов необходимо удалить старые моментальные снимки, так как это снижает затраты на хранение, повышает производительность запросов и обеспечивает соблюдение требований к данным. Выполните операцию VACUUM, чтобы очистить старые моментальные снимки. См. VACUUM.
Питон
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
язык программирования Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
SQL
VACUUM workspace.default.people_10k
Дополнительные сведения об эффективном использовании операции вакуума см. в разделе "Удаление неиспользуемых файлов данных с помощью вакуума".