Создание таблиц Delta Lake

Завершено

В основе Delta Lake находятся таблицы, которые обеспечивают абстракцию реляционного хранилища над файлами в озере данных.

Создание таблицы Delta Lake из кадра данных

Один из самых простых способов создать таблицу Delta Lake — сохранить кадр данных в разностном формате, указав путь, по которому должны храниться файлы данных и связанные метаданные для таблицы.

Например, следующий код PySpark загружает кадр данных с данными из существующего файла, а затем сохраняет этот кадр данных в новое расположение папки в разностном формате:

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

После сохранения разностной таблицы указанный путь включает файлы parquet для данных (независимо от формата исходного файла, загруженного в кадр данных) и папку _delta_log, содержащую журнал транзакций для таблицы.

Примечание.

В журнале транзакций записываются все изменения данных в таблице. За счет регистрации каждого изменения можно обеспечить согласованность транзакций и сохранить сведения о версиях для таблицы.

Существующую таблицу Delta Lake можно заменить содержимым кадра данных с помощью режима перезаписи (overwrite), как показано ниже.

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

Кроме того, вы можете добавить строки из кадра данных в существующую таблицу, используя режим добавления (append):

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Выполнение условных обновлений

Вы можете внести изменения данных в кадр данных, а затем заменить таблицу Delta Lake, перезаписав ее, но более распространенным шаблоном в базе данных является вставка, обновление или удаление строк в существующей таблице с помощью дискретных транзакционных операций. Чтобы внести такие изменения в таблицу Delta Lake, можно использовать объект DeltaTable в API Delta Lake, который поддерживает операции обновления, удаления и слияния. Например, можно использовать следующий код для обновления столбца price для всех строк, у которых значение столбца category равно "Accessories".

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Изменения данных записываются в журнал транзакций, а новые файлы Parquet создаются в папке таблицы по мере необходимости.

Совет

Дополнительные сведения об использовании API Delta Lake см . в документации по API Delta Lake.

Запрос предыдущей версии таблицы

Таблицы Delta Lake поддерживают управление версиями в журнале транзакций. Журнал транзакций записывает изменения, внесенные в таблицу, отмечая метку времени и номер версии для каждой транзакции. Эти записанные данные версий можно использовать для просмотра предыдущих версий таблицы. Эта функция известна как переход по времени.

Вы можете получить данные из определенной версии таблицы Delta Lake, считав данные из расположения разностной таблицы в кадр данных с указанием требуемой версии в качестве параметра versionAsOf:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

Кроме того, можно указать метку времени с помощью параметра timestampAsOf:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)