Поделиться через


Руководство по Delta Lake

В этом руководстве представлены общие операции Delta Lake в Azure Databricks, в том числе следующие:

Пример кода Python, Scala и SQL в этой статье можно запустить из записной книжки , подключенной к вычислительному ресурсу Azure Databricks, например кластеру. Вы также можете выполнить приведенный в этой статье код SQL из запроса, связанного с хранилищем SQL в Databricks SQL.

Подготовка исходных данных

В этом руководстве используется набор данных с именем People 10 M. Он содержит 10 миллионов вымышленных записей, которые содержат факты о людях, таких как первые и фамилии, дата рождения и зарплата. В этом руководстве предполагается, что этот набор данных находится в томе каталога Unity, связанном с целевой рабочей областью Azure Databricks.

Чтобы получить набор данных People 10 M для этого руководства, сделайте следующее:

  1. Перейдите на страницу "Люди 10 M" в Kaggle.
  2. Нажмите кнопку "Скачать ", чтобы скачать файл с именем archive.zip на локальный компьютер.
  3. Извлеките файл с именем export.csv из archive.zip файла. Файл export.csv содержит данные для этого руководства.

Чтобы отправить export.csv файл в том, сделайте следующее:

  1. На боковой панели щелкните "Каталог".
  2. В обозревателе каталогов найдите и откройте том, в котором нужно отправить export.csv файл.
  3. Нажмите кнопку " Отправить в этот том".
  4. Перетащите и удалите файл или export.csv выберите файл на локальном компьютере.
  5. Нажмите кнопку Отправить.

В следующих примерах кода замените /Volumes/main/default/my-volume/export.csv путь к файлу в целевом export.csv томе.

Создание таблицы

Таблицы, созданные в Azure Databricks, по умолчанию используют Delta Lake. Databricks рекомендует использовать управляемые таблицы каталога Unity.

В предыдущем примере кода и в следующих примерах кода замените имя таблицы целевым трехкомпонентным каталогом, схемой и именем main.default.people_10m таблицы в каталоге Unity.

Примечание.

Delta Lake — это значение по умолчанию для всех команд создания таблиц и операций чтения, записи и таблицы Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Предыдущие операции создают новую управляемую таблицу. Сведения о доступных параметрах при создании разностной таблицы см. в статье CREATE TABLE.

В Databricks Runtime 13.3 LTS и более поздних версиях можно использовать create TABLE LIKE , чтобы создать пустую таблицу Delta, которая дублирует свойства схемы и таблицы для исходной разностной таблицы. Это может быть особенно полезно при продвижении таблиц из среды разработки в рабочую среду, как показано в следующем примере кода:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Чтобы создать пустую таблицу, можно также использовать DeltaTableBuilder API в Delta Lake для Python и Scala. По сравнению с эквивалентными API DataFrameWriter эти API упрощают указание дополнительных сведений, таких как комментарии к столбцам, свойства таблицы и созданные столбцы.

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert в таблицу

Чтобы объединить набор обновлений и вставок в существующую таблицу Delta, используйте DeltaTable.merge метод для Python и Scala, а также инструкцию MERGE INTO для SQL. Например, следующий пример принимает данные из исходной таблицы и объединяет его в целевую таблицу Delta. Если в обеих таблицах есть совпадающая строка, Delta Lake обновляет столбец данных с использованием заданного выражения. Если совпадающая строка отсутствует, Delta Lake добавляет новую строку. Эта операция называется upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

При указании *в SQL это обновление или вставка всех столбцов в целевую таблицу предполагается, что исходная таблица имеет те же столбцы, что и целевая таблица. Если целевая таблица не имеет одинаковых столбцов, запрос выдает ошибку анализа.

При выполнении операции вставки необходимо указать значение для каждого столбца в таблице (например, если в существующем наборе данных нет соответствующей строки). Однако обновлять все значения не требуется.

Чтобы просмотреть результаты, отправьте к таблице запрос.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Чтение из таблицы

Доступ к данным в разностных таблицах можно получить по имени таблицы или пути к таблице, как показано в следующих примерах:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Запись в таблицу

Delta Lake использует стандартный синтаксис для записи данных в таблицы.

Чтобы атомарно добавить новые данные в существующую таблицу Delta, используйте режим добавления, как показано в следующих примерах:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Чтобы заменить все данные в таблице, используйте режим перезаписи, как показано в следующих примерах:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Обновление таблицы

Вы можете обновить данные, соответствующие предикату в таблице Delta. Например, в таблице примера people_10m можно изменить сокращение столбца из M столбца gender или FMale на или Femaleвыполнить следующее:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Удаление из таблицы

Вы можете удалить данные, соответствующие предикату из таблицы Delta. Например, в таблице примера people_10m для удаления всех строк, соответствующих людям со значением в birthDate столбце, 1955можно выполнить следующее:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Внимание

Удаление удаляет данные из последней версии таблицы Delta, но не удаляет их из физического хранилища до тех пор, пока старые версии не будут явно вакуумированы. См. подробные сведения о команде vacuum.

Отображение журнала таблиц

Чтобы просмотреть журнал таблицы, используйте DeltaTable.history метод для Python и Scala, а также инструкцию DESCRIBE HISTORY в SQL, которая предоставляет сведения о происхождении, включая версию таблицы, операцию, пользователя и т. д. для каждой записи в таблицу.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Запрос более ранней версии таблицы (путешествия по времени)

Переход по временем в Delta Lake позволяет запрашивать более ранний моментальный снимок таблицы Delta.

Чтобы запросить старую версию таблицы, укажите версию или метку времени таблицы. Например, чтобы запросить версию 0 или метку 2024-05-15T22:43:15.000+00:00Z времени из предыдущей истории, используйте следующую команду:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Для меток времени принимаются только строки даты или метки времени, например"2024-05-15T22:43:15.000+00:00"."2024-05-15 22:43:15"

Параметры DataFrameReader позволяют создать кадр данных из таблицы Delta, фиксированной к определенной версии или метке времени таблицы, например:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Дополнительные сведения см. в разделе " Работа с журналом таблиц Delta Lake".

Оптимизация таблицы

После выполнения нескольких изменений в таблице может потребоваться много небольших файлов. Чтобы повысить скорость чтения запросов, можно использовать операцию оптимизации для сворачивания небольших файлов в большие:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

SQL

OPTIMIZE main.default.people_10m

Z-порядок по столбцам

Чтобы повысить производительность чтения, можно совместно использовать связанные сведения в одном наборе файлов с помощью z-упорядочения. Алгоритмы пропуска данных Delta Lake используют это кололокацию для резкого уменьшения объема данных, которые необходимо считывать. Для данных z-order можно указать столбцы для заказа в z-порядке по операции. Например, чтобы выполнить сортировку по genderкоманде, выполните следующую команду:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Полный набор параметров, доступных при выполнении операции оптимизации, см. в разделе "Оптимизация макета файла данных".

Очистка моментальных снимков с помощью VACUUM

Delta Lake обеспечивает изоляцию моментальных снимков для операций чтения, что означает, что она безопасна для выполнения операции оптимизации, даже если другие пользователи или задания запрашивают таблицу. Однако в конечном итоге старые моментальные снимки нужно удалять. Это можно сделать, выполнив операцию вакуума:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Дополнительные сведения об эффективном использовании операции вакуума см. в разделе "Удаление неиспользуемых файлов данных с помощью вакуума".