次の方法で共有


チュートリアル: Delta Lake

このチュートリアルでは、Azure Databricks に対する Delta Lake の次のような一般的な操作について説明します。

クラスターなどの Azure Databricks コンピューティング リソースにアタッチされているノートブック内から、Python、Scala、または SQL のサンプル コードを実行できます。 また、Databricks SQLSQL ウェアハウスに関連付けられているクエリ内から SQL コードを実行することもできます。

ソース データを準備する

このチュートリアルでは、People 10 M というデータセットを使用します。このデータセットには、氏名、生年月日、給与など、ユーザーに関する事実を保持する 1,000 万件の架空のレコードが含まれます。 このチュートリアルでは、このデータセットが、対象の Azure Databricks ワークスペースに関連付けられている Unity Catalog のボリュームに存在することを前提としています。

このチュートリアルで使用する People 10 M データセットを取得するには、次の操作を行います。

  1. Kaggle の [People 10 M] ページに移動します。
  2. [ダウンロード] をクリックし、archive.zip という名前のファイルをローカル コンピューターにダウンロードします。
  3. archive.zip ファイルから export.csv という名前のファイルを展開します。 export.csv ファイルにはこのチュートリアルのデータが含まれます。

export.csv ファイルをボリュームにアップロードするには、次の操作を行います。

  1. サイドバーで、[カタログ] をクリックします。
  2. [カタログ エクスプローラー] で、export.csv ファイルをアップロードするボリュームを参照して開きます。
  3. [Upload to this volume] (このボリュームにアップロード) をクリックします。
  4. ローカル コンピューターにある export.csv ファイルをドラッグ アンド ドロップするか、参照して選択します。
  5. アップロードをクリックします。

次のコードの例では、/Volumes/main/default/my-volume/export.csv をターゲット ボリューム内の export.csv ファイルへのパスに置き換えます。

テーブルを作成する

Azure Databricks で作成されたすべてのテーブルでは、既定で Delta Lake が使用されます。 Databricks では、Unity Catalog マネージド テーブルの使用をお勧めしています。

前のコードの例と次のコードの例では、テーブル名 main.default.people_10m を Unity Catalog の対象となる 3 つのパートのカタログ、スキーマ、テーブル名に置き換えます。

Note

Delta Lake は、Azure Databricks のすべての読み取り、書き込み、テーブル作成コマンドのデフォルトです。

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

上記の操作では、新しいマネージド テーブルが作成されます。 Delta テーブルを作成するときに使用できるオプションの詳細については、「テーブルの作成」を参照してください。

Databricks Runtime 13.3 LTS 以降では、CREATE TABLE LIKE を使用して、ソース Delta テーブルのスキーマとテーブルのプロパティを複製する新しい空の Delta テーブルを作成できます。 これは、次のコード例に示すように、開発環境から運用環境にテーブルを昇格する場合に特に役立ちます。

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

空のテーブルを作成するには、PythonScala に Delta Lake で DeltaTableBuilder API を使用することもできます。 同等の DataFrameWriter API と比較して、これらの API を使用すると、列のコメント、テーブルのプロパティ、生成された列のような追加情報の指定が簡単になります。

重要

この機能はパブリック プレビュー段階にあります。

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

テーブルへのアップサート

一連の更新と挿入を既存の Delta テーブルにマージするには、PythonScala には DeltaTable.merge メソッドを使用し、SQL には MERGE INTO ステートメントを使用します。 たとえば、次の例は、ソース テーブルからデータを取得し、ターゲット Delta テーブルにマージします。 両方のテーブルに一致する行がある場合、Delta Lake は指定された式を使用してデータ列を更新します。 一致する行がない場合、Delta Lake によって新しい行が追加されます。 この操作はアップサートと呼ばれます。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

SQL では、* を指定すると、ソース テーブルにターゲット テーブルと同じ列があると仮定して、ターゲット テーブルのすべての列を更新または挿入します。 ターゲット テーブルに同じ列がない場合、クエリは分析エラーをスローします。

挿入操作を実行する際には、テーブル内のすべての列に値を指定する必要があります (たとえば、既存のデータセットに一致する行がない場合など)。 ただし、すべての値を更新する必要はありません。

結果を表示するには、テーブルに対してクエリを実行します。

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

テーブルの読み取り

次の例に示すように、テーブル名またはテーブル パスによって Delta テーブルのデータにアクセスします。

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

テーブルへの書き込み

Delta Lake では、テーブルにデータを書き込むために標準構文が使用されます。

既存の Delta テーブルに新しいデータをアトミックに追加するには、次の例に示すように追加モードを使用します。

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

テーブル内のすべてのデータを置き換えるには、次の例のように上書きモードを使用 します。

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

テーブルの更新

Delta テーブルの述語に一致するデータを更新できます。 たとえば、people_10m テーブルの例で、gender 列の省略形を M または F から Male または Female, に変更するには、次のように実行できます。

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

テーブルからの削除

Delta テーブルから述語に一致するデータを削除できます。 たとえば、people_10m テーブルの例で、birthDate 列に 1955 より前の値を持つユーザーに対応する行をすべて削除するには、次のように実行できます。

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

重要

削除では、最新バージョンの Delta テーブルからデータが削除されますが、前のバージョンが明示的にバキュームされるまで、物理ストレージからデータは削除されません。 詳細についてはバキュームに関するページを参照してください。

テーブル履歴の表示

テーブルの履歴を表示するには、PythonScala には DeltaTable.history メソッドを使用し、SQL では DESCRIBE HISTORY ステートメントを使用します。このステートメントは、テーブルへの書き込みごとに、テーブルのバージョン、操作、ユーザーなどの実証情報を提供します。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

以前のバージョンのテーブルに対してクエリを実行する (タイム トラベル)

Delta Lake タイム トラベル機能を使用すると、Delta テーブルのスナップショットを過去にさかのぼって照会することができます。

テーブルの以前のバージョンに対してクエリを実行するには、テーブルのバージョンまたはタイムスタンプを指定します。 たとえば、上記の履歴からバージョン 0 やタイムスタンプ 2024-05-15T22:43:15.000+00:00Z に対してクエリを実行するには、以下を使用します。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

タイムスタンプの場合、"2024-05-15T22:43:15.000+00:00""2024-05-15 22:43:15" など、日付またはタイムスタンプ文字列のみを使用できます。

DataFrameReader オプションを使用すると、次のような特定のバージョンまたはタイムスタンプのテーブルに固定されている Delta テーブルから DataFrame を作成できます。

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

詳細については、「Delta Lake テーブル履歴の処理」を参照してください。

テーブルの最適化

テーブルに対して複数の変更を実行すると、多数の小さなファイルができる可能性があります。 読み取りクエリの速度を向上させるために、操作の最適化を使用して小さなファイルをより大きなファイルに折りたたむことができます。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()

SQL

OPTIMIZE main.default.people_10m

列による Z オーダー

読み取りパフォーマンスをさらに向上させるために、同じファイル セット内の関連情報を Z オーダー別に併置することができます。 Delta Lake データをスキップするアルゴリズムでは、読み取る必要があるデータの量を大幅に削減するためにこの併置を使用します。 Z オーダー データには、順序付けする列を操作による Z オーダーで指定します。 たとえば、gender で併置するには、次のように実行します。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

最適化操作の実行時に使用できるオプションの完全なセットについては、「データ ファイル レイアウトを最適化する」を参照してください。

VACUUM を使用してスナップショットをクリーンアップする

デルタレイクでは読み取りのスナップショット分離が提供されるため、他のユーザーまたはジョブがテーブルに対してクエリを実行している間でも安全に操作の最適化を実行できます。 ただし、最終的には、以前のスナップショットをクリーンアップする必要があります。 これは、バキューム操作を実行することでできます。

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

バキューム操作を効果的に使用する方法の詳細については、「バキュームを使用して未使用のデータ ファイルを削除する」を参照してください。