Delta Live Tables の Python 言語リファレンス

この記事では、Delta Live Tables の Python プログラミング インターフェイスの詳細について説明します。

SQL API の詳細については、「Delta Live Tables の SQL 言語リファレンス」を参照してください。

自動ローダーの構成に固有の詳細については、自動ローダーに関する記事を参照してください。

制限事項

Delta Live Tables Python インターフェイスには、次の制限があります。

  • Python tableview の関数は DataFrame を返す必要があります。 DataFrames で動作する一部の関数は DataFrame を返さないので、使用しないでください。 DataFrame 変換は完全なデータフロー グラフが解決された "後" に実行されるため、このような操作を使用すると意図しない副作用が発生する可能性があります。 これらの操作には、collect()count()toPandas()save()saveAsTable() などの関数が含まれます。 ただし、このコードはグラフの初期化フェーズ中に 1 回実行されるため、table または view 関数定義の外部にこれらの関数を含めることができます。
  • pivot() 関数はサポートされません。 Spark での pivot 操作では、出力のスキーマを計算するために、入力データを積極的に読み込む必要があります。 この機能は、Delta Live Tables ではサポートされていません。

dlt Python モジュールをインポートする

Delta Live Tables の Python 関数は、dlt モジュールで定義されています。 Python API で実装されたパイプラインは、このモジュールをインポートする必要があります。

import dlt

Delta Live Tables の具体化されたビューまたはストリーミング テーブルを作成する

Python では、Delta Live Tables が定義クエリに基づいて、データセットを具体化されたビューとして更新するか、ストリーミング テーブルとして更新するかを決定します。 @table デコレーターは、具体化されたビューとストリーミング テーブルの両方を定義するときに使用されます。

具体化されたビューを Python で定義するには、データ ソースに対して静的読み取りを実行するクエリに @table を適用します。 ストリーミング テーブルを定義するには、データ ソースに対してストリーミング読み取りを実行するクエリに @table を適用します。 次に示すように、どちらのデータセット型も構文仕様は同じです。

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Delta Live Tables ビューを作成する

Python でビューを定義するには、@view デコレーターを適用します。 @table デコレーターと同様、Delta Live Tables では、ビューを静的データセットまたはストリーミング データセットのいずれかに使用できます。 Python でビューを定義するための構文を次に示します。

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

例: テーブルとビューを定義する

Python でビューまたはテーブルを定義するには、@dlt.view または @dlt.table のデコレーターを関数に適用します。 関数名または name パラメーターを使用して、テーブルまたはビューの名前を割り当てることができます。 次の例では、2 つの異なるデータセットを定義します。JSON ファイルを入力ソースとして受け取る taxi_raw というビューと、taxi_raw ビューを入力として受け取る filtered_data というテーブルです。

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

例: 同じパイプラインで定義されているデータセットにアクセスする

外部データ ソースからの読み取りに加えて、Delta Live Tables の read() 関数を使用して、同じパイプラインで定義されているデータセットにアクセスできます。 次の例は、read() 関数を使用して customers_filtered データセットを作成する方法を示しています。

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

spark.table() 関数を使用して、同じパイプラインで定義されているデータセットにアクセスすることもできます。 spark.table() 関数を使用して、パイプラインで定義されているデータセットにアクセスする場合は、関数の引数で、データセット名の前に LIVE キーワードを追加します。

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

例: メタストアに登録されているテーブルから読み取る

Hive メタストアに登録されているテーブルからデータを読み取るには、関数の引数で、 LIVE キーワードを省略し、必要に応じてテーブル名をデータベース名で修飾します。

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Unity Catalog テーブルからの読み取りの例については、「Unity Catalog パイプラインにデータを取り込む」を参照してください。

例: spark.sql を使用してデータセットにアクセスする

クエリ関数で spark.sql 式を使用してデータセットを返すこともできます。 内部データセットから読み取るには、データセット名の前に LIVE. を追加します。

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

ストリーミング操作のターゲットとして使用するテーブルを作成する

create_streaming_table() 関数を使用して、apply_changes()@append_flow 出力レコードなど、ストリーミング操作によって出力されるレコードのターゲット テーブルを作成します。

Note

create_target_table() 関数と create_streaming_live_table() 関数は非推奨です。 Databricks では、create_streaming_table() 関数を使用するように既存のコードを更新することをお勧めします。

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
引数
name

型: str

テーブル名。

このパラメーターは必須です。
comment

型: str

テーブルの省略可能な説明。
spark_conf

型: dict

このクエリの実行の Spark 構成の省略可能なリスト。
table_properties

型: dict

テーブルのテーブル プロパティの省略可能なリスト。
partition_cols

型: array

テーブルのパーティション分割に使用する 1 つ以上の列の省略可能なリスト。
path

型: str

テーブル データの省略可能な保存場所。 設定されていない場合、システムは既定でパイプラインの保存場所に設定します。
schema

型: str または StructType

テーブルの省略可能なスキーマ定義。 スキーマは、SQL DDL 文字列としてまたは Python を使用して定義できます。
StructType
expect_all
expect_all_or_drop
expect_all_or_fail

型: dict

テーブルのオプションのデータ品質制約。 「複数の期待値」を参照してください。

テーブルが具体化される方法を制御する

テーブルには、その具体化の追加のコントロールも用意されています。

Note

サイズが 1 TB 未満のテーブルの場合、Databricks では、Delta Live Tables でデータ編成を制御できるようにすることをお勧めします。 テーブルのサイズが 1 テラバイトを超えて拡大することが予想される場合を除き、通常はパーティション列を指定しないでください。

例: スキーマ列とパーティション列を指定する

必要に応じて、Python StructType または SQL DDL 文字列を使用して、テーブル スキーマを指定できます。 DDL 文字列で指定する場合、定義には生成された列を含めることができます。

次の例では、Python StructType を使用して指定されたスキーマを使用して、 sales という名前のテーブルを作成します:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

次の例では、DDL 文字列を使用してテーブルのスキーマを指定し、生成された列を定義して、パーティション列を定義します。

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

既定では、スキーマが指定されなかった場合、Delta Live Tables は、table 定義からスキーマを推論します。

ソース ストリーミング テーブルの変更を無視するようにストリーミング テーブルを構成する

Note

  • skipChangeCommits フラグは、option() 関数を使用する spark.readStream でのみ動作します。 このフラグは、dlt.read_stream() 関数で使用することはできません。
  • ソース ストリーミング テーブルが apply_changes() 関数のターゲットとして定義されている場合、skipChangeCommits フラグを使用することはできません。

既定では、ストリーミング テーブルには追加のみのソースが必要です。 あるストリーミング テーブルで、別のストリーミング テーブルがソースとして使用されており、そのソース ストリーミング テーブルに更新または削除が必要な場合、たとえば、GDPR の "忘れられる権利" 処理では、それらの変更を無視するために、ソース ストリーミング テーブルの読み込み時に skipChangeCommits フラグを設定することができます。 このフラグの詳細については、「更新と削除を無視する」を参照してください。

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Python Delta Live Tables のプロパティ

次の表では、Delta Live Tables を使用してテーブルとビューを定義するときに指定できるオプションとプロパティについて説明します。

@table または @view
name

型: str

テーブルまたはビューの省略可能な名前。 定義されていない場合は、関数名がテーブルまたはビューの名前として使用されます。
comment

型: str

テーブルの省略可能な説明。
spark_conf

型: dict

このクエリの実行の Spark 構成の省略可能なリスト。
table_properties

型: dict

テーブルのテーブル プロパティの省略可能なリスト。
path

型: str

テーブル データの省略可能な保存場所。 設定されていない場合、システムは既定でパイプラインの保存場所に設定します。
partition_cols

型: a collection of str

テーブルのパーティション分割に使用する 1 つ以上の列の省略可能なコレクション (list など)。
schema

型: str または StructType

テーブルの省略可能なスキーマ定義。 スキーマは、SQL DDL 文字列としてまたは Python を使用して定義できます。
StructType
temporary

型: bool

テーブルを作成しますが、テーブルのメタデータは発行しません。 temporary キーワードは、パイプラインでは使用できるが、パイプラインの外部ではアクセスできないテーブルを作成するように Delta Live Tables に指示します。 処理時間を短縮するために、一時テーブルは、単一の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。

既定値は "False" です。
テーブルまたはビューの定義
def <function-name>()

データセットを定義する Python 関数。 name パラメーターが設定されていない場合は、<function-name> がターゲット データセット名として使用されます。
query

Spark Dataset または Koalas DataFrame を返す Spark SQL ステートメント。

dlt.read() または spark.table() を使用して、同じパイプラインで定義されているデータセットからの完全な読み取りを実行します。 spark.table() 関数を使用して、同じパイプラインで定義されているデータセットから読み取る場合は、関数の引数でデータセット名の前に LIVE キーワードを追加します。 たとえば、customers という名前のデータセットから読み取るには、次のようにします。

spark.table("LIVE.customers")

また、LIVE キーワードを省略し、必要に応じてテーブル名をデータベース名で修飾することにより、spark.table() 関数を使用して、メタストアに登録されているテーブルから読み取ることもできます。

spark.table("sales.customers")

dlt.read_stream() を使用して、同じパイプラインで定義されているデータセットからのストリーミング読み取りを実行します。

spark.sql 関数を使用して、返されるデータセットを作成する SQL クエリを定義します。

PySpark 構文を使用して、Python で Delta Live Tables クエリを定義します。
期待される回答
@expect("description", "constraint")

次によって識別されるデータ品質の制約を宣言します。
description 行が期待値に違反する場合は、ターゲット データセットに行を含めます。
@expect_or_drop("description", "constraint")

次によって識別されるデータ品質の制約を宣言します。
description 行が期待値に違反する場合は、ターゲット データセットから行を削除します。
@expect_or_fail("description", "constraint")

次によって識別されるデータ品質の制約を宣言します。
description 行が期待値に違反する場合は、すぐに実行を停止します。
@expect_all(expectations)

1 つ以上のデータ品質の制約を宣言します。
expectations は Python ディクショナリです。キーは期待値の説明であり、値は期待値の制約です。 行がいずれかの期待値に違反する場合は、ターゲット データセットに行を含めます。
@expect_all_or_drop(expectations)

1 つ以上のデータ品質の制約を宣言します。
expectations は Python ディクショナリです。キーは期待値の説明であり、値は期待値の制約です。 行がいずれかの期待値に違反する場合は、ターゲット データセットから行を削除します。
@expect_all_or_fail(expectations)

1 つ以上のデータ品質の制約を宣言します。
expectations は Python ディクショナリです。キーは期待値の説明であり、値は期待値の制約です。 行がいずれかの期待値に違反する場合は、すぐに実行を停止します。

Delta Live Tables での Python を使用した変更データ キャプチャ

Python API で apply_changes() 関数を使用して、Delta Live Tables CDC 機能を使用します。 Delta Live Tables Python インターフェイスには create_streaming_table() 関数も用意されています。 この関数を使用して、apply_changes() 関数に必要なターゲット テーブルを作成できます。

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Note

INSERT イベントと UPDATE イベントの既定の動作では、ソースから CDC イベントを upsert します。指定したキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。 DELETE イベントの処理は、APPLY AS DELETE WHEN 条件で指定できます。

重要

変更を適用する対象のターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 apply_changes ターゲット テーブルのスキーマを指定する場合は、sequence_by フィールドと同じデータ型で __START_AT および __END_AT 列も含める必要があります。

APPLY CHANGES API: Delta Live Tables の変更データ キャプチャを簡略化する」を参照してください。

引数
target

型: str

更新するテーブルの名前。 create_streaming_table() 関数を使用して、apply_changes() 関数を実行する前にターゲット テーブルを作成できます。

このパラメーターは必須です。
source

型: str

CDC レコードを含むデータ ソース。

このパラメーターは必須です。
keys

型: list

ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。

次のいずれかを指定できます。

* 文字列のリスト: ["userId", "orderId"]
* Spark SQL col() 関数の一覧: [col("userId"), col("orderId"]

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは必須です。
sequence_by

型: str または col()

ソース データ内の CDC イベントの論理順序を指定する列名。 Delta Live Tables では、このシーケンス処理を使用して、順不同で到着する変更イベントを処理します。

次のいずれかを指定できます。

* 文字列: "sequenceNum"
* Spark SQL col() 関数: col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは必須です。
ignore_null_updates

型: bool

ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、ignore_null_updatesTrue の場合、null を持つ列はターゲット内の既存の値を保持します。 これは、null の値を持つ入れ子になった列にも適用されます。 ignore_null_updatesFalse の場合、既存の値は null 値で上書きされます。

このパラメーターは省略可能です。

既定値は、False です。
apply_as_deletes

型: str または expr()

CDC イベントをアップサートでなく DELETE として扱う必要がある場合に指定します。 順序の誤ったデータを処理するために、削除された行は基になる Delta テーブルの廃棄標識として一時的に保持され、これらの廃棄標識をフィルターで除外するビューがメタストアに作成されます。 データ保持間隔は次を使用して構成できます。
pipelines.cdc.tombstoneGCThresholdInSecondsテーブル プロパティ

次のいずれかを指定できます。

* 文字列: "Operation = 'DELETE'"
* Spark SQL expr() 関数: expr("Operation = 'DELETE'")

このパラメーターは省略可能です。
apply_as_truncates

型: str または expr()

CDC イベントを完全なテーブル TRUNCATE として扱う必要がある場合に指定します。 この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースでのみ使用する必要があります。

この apply_as_truncates パラメーターは、SCD タイプ 1 でのみサポートされます。 SCD タイプ 2 では、切り捨てはサポートされていません。

次のいずれかを指定できます。

* 文字列: "Operation = 'TRUNCATE'"
* Spark SQL expr() 関数: expr("Operation = 'TRUNCATE'")

このパラメーターは省略可能です。
column_list

except_column_list

型: list

ターゲット テーブルに含める列のサブセット。 column_list を使用して、含める列の完全な一覧を指定します。 except_column_list を使用して、除外する列を指定します。 値は、文字列の一覧として、または Spark SQL col() 関数として宣言できます。

* column_list = ["userId", "name", "city"]
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは省略可能です。

既定では、column_list 引数または except_column_list 引数が関数に渡されない場合、ターゲット テーブル内のすべての列が含まれます。
stored_as_scd_type

型: str または int

レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。

SCD タイプ 1 の場合は 1 に、SCD タイプ 2 の場合は 2 に設定します。

この句は省略可能です。

既定値は SCD タイプ 1 です。
track_history_column_list

track_history_except_column_list

型: list

ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。 track_history_column_list を使用して、追跡する列の完全なリストを指定します。 用途
track_history_except_column_list を使用して、追跡から除外する列を指定します。 値は、文字列のリストとして、または Spark SQL col() 関数として宣言できます:- track_history_column_list = ["userId", "name", "city"]。 - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 関数の引数に修飾子を含めることはできません。 たとえば、col(userId) を使用することはできますが、col(source.userId) を使用することはできません。

このパラメーターは省略可能です。

既定では、track_history_column_listがないか以下の場合、ターゲット テーブルのすべての列が含まれます
track_history_except_column_list引数が関数に渡されます。