Spark でデルタ テーブルを操作する

完了

デルタ テーブル (またはデルタ形式ファイル) を使用して、複数の方法でデータを取得および変更できます。

Spark SQL を使用する

Spark でデルタ テーブルのデータを操作する最も一般的な方法は、Spark SQL を使用することです。 spark.sql ライブラリを使用して、他の言語 (PySpark や Scala など) に SQL ステートメントを埋め込むことができます。 たとえば、次のコードは products テーブルに行を挿入します。

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

または、ノートブックで %%sql マジックを使用して SQL ステートメントを実行することもできます。

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Delta API を使用する

カタログ テーブルではなくデルタ ファイルを操作するときは、Delta Lake API を使用する方が簡単な場合があります。 デルタ形式のファイルを含むフォルダーの場所から DeltaTable のインスタンスを作成し、API を使用してテーブル内のデータを変更できます。

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

"タイム トラベル" を使用してテーブルのバージョン管理を操作する

デルタ テーブルに対して行われた変更は、テーブルのトランザクション ログに記録されます。 ログに記録されたトランザクションを使用して、テーブルに加えられた変更の履歴を表示したり、以前のバージョンのデータを取得したり ("タイム トラベル" と呼ばれる) できます。

テーブルの履歴を表示するには、次に示すように SQL コマンド DESCRIBE を使用します。

%%sql

DESCRIBE HISTORY products

このステートメントの結果には、テーブルに適用されたトランザクションが次のように示されます (一部の列は省略されています)。

version timestamp operation operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

外部テーブルの履歴を表示するには、テーブル名の代わりにフォルダーの場所を指定します。

%%sql

DESCRIBE HISTORY 'Files/mytable'

デルタ ファイルの場所をデータフレームに読み込み、必要なバージョンを versionAsOf オプションとして指定することにより、特定のデータ バージョンからデータを取得できます。

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

または、timestampAsOf オプションを使ってタイムスタンプを指定することもできます。

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)