Lakeflow Spark 宣言型パイプライン (SDP) では、パイプラインで具体化されたビューとストリーミング テーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプラインの開発に対する SQL サポートは、Spark SQL の基本に基づいており、構造化ストリーミング機能のサポートが追加されています。
PySpark DataFrames に慣れているユーザーは、Python を使用したパイプライン コードの開発を好む場合があります。 Python では、メタプログラミング操作など、SQL で実装するのが困難な、より広範なテストと操作がサポートされています。 Python を使用した Develop パイプライン コードを参照してください。
パイプライン SQL 構文の完全なリファレンスについては、 パイプライン SQL 言語リファレンスを参照してください。
パイプライン開発のための SQL の基本
パイプライン データセットを作成する SQL コードでは、 CREATE OR REFRESH 構文を使用して、クエリ結果に対して具体化されたビューとストリーミング テーブルを定義します。
STREAM キーワードは、SELECT句で参照されるデータ ソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。
パイプラインの構成時に指定されたカタログとスキーマに対する既定の読み取りと書き込みを行います。 「ターゲット カタログとスキーマのを設定する」を参照してください。
パイプライン ソース コードは SQL スクリプトとは大きく異なります。SDP は、パイプラインで構成されているすべてのソース コード ファイルのすべてのデータセット定義を評価し、クエリを実行する前にデータフロー グラフを構築します。 ソース ファイルに表示されるクエリの順序によって、コード評価の順序は定義されますが、クエリ実行の順序は定義されません。
SQL を使用して具体化されたビューを作成する
次のコード例は、SQL で具体化されたビューを作成するための基本的な構文を示しています。
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQL を使用してストリーミング テーブルを作成する
次のコード例は、SQL を使用してストリーミング テーブルを作成するための基本的な構文を示しています。 ストリーミング テーブルのソースを読み取る場合、 STREAM キーワードは、ソースにストリーミング セマンティクスを使用することを示します。 具体化されたビューを作成するときは、 STREAM キーワードを使用しないでください。
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
注
STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python と SkipChangeCommits オプションを使用してエラーを処理できます。
オブジェクト ストレージからデータを読み込む
パイプラインでは、Azure Databricks でサポートされているすべての形式からのデータの読み込みがサポートされています。 「データ形式のオプション」を参照してください。
注
これらの例では、ワークスペースに自動的にマウントされた /databricks-datasets で使用可能なデータを使用します。 Databricks では、クラウド オブジェクト ストレージに格納されているデータを参照するために、ボリューム パスまたはクラウド URI を使用することをお勧めします。
「Unity カタログ ボリュームとは」を参照してください。
Databricks では、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成するときに、自動ローダーとストリーミング テーブルを使用することをお勧めします。 「自動ローダーとは」を参照してください。
SQL では、 read_files 関数を使用して自動ローダー機能を呼び出します。 また、 STREAM キーワードを使用して、 read_filesでストリーミング読み取りを構成する必要があります。
SQL での read_files の構文を次に示します。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
自動ローダーのオプションは、キーと値のペアです。 サポートされている形式とオプションの詳細については、「 オプション」を参照してください。
次の例では、自動ローダーを使用して JSON ファイルからストリーミング テーブルを作成します。
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
read_files関数では、マテリアライズド ビューを作成するためのバッチ セマンティクスもサポートされています。 次の例では、バッチ セマンティクスを使用して JSON ディレクトリを読み取り、具体化されたビューを作成します。
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
期待値を使用してデータを検証する
期待値を使用して、データ品質の制約を設定および適用できます。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
次のコードでは、データ インジェスト中に null レコードを削除する valid_data という名前の期待値を定義します。
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
パイプラインで定義された具体化されたビューとストリーミング テーブルに対してクエリを実行する
次の例では、4 つのデータセットを定義します。
- JSON データを読み込む
ordersという名前のストリーミング テーブル。 - CSV データを読み込む
customersという名前の具体化されたビュー。 -
customer_ordersという名前のマテリアライズドビューは、ordersおよびcustomersデータセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id、order_number、state、およびorder_dateフィールドを選択します。 - 各状態の注文の日次数を集計する
daily_orders_by_stateという名前の具体化されたビュー。
注
パイプライン内のビューまたはテーブルに対してクエリを実行する場合は、カタログとスキーマを直接指定することも、パイプラインで構成された既定値を使用することもできます。 この例では、orders、customers、および customer_orders テーブルが書き込まれ、パイプライン用に構成された既定のカタログとスキーマから読み取られます。
従来の発行モードでは、LIVE スキーマを使用して、パイプラインで定義されている他の具体化されたビューとストリーミング テーブルに対してクエリを実行します。 新しいパイプラインでは、LIVE スキーマの構文は暗黙的に無視されます。
LIVE スキーマ (レガシー)を参照してください。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
プライベート テーブルを定義する
具体化されたビューまたはストリーミング テーブルを作成するときに、 PRIVATE 句を使用できます。 プライベート テーブルを作成するときは、テーブルを作成しますが、テーブルのメタデータは作成しません。
PRIVATE句は、パイプラインで使用できるが、パイプラインの外部ではアクセスできないテーブルを作成するように SDP に指示します。 処理時間を短縮するために、プライベート テーブルは、単一の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。
プライベート テーブルには、カタログ内のテーブルと同じ名前を付けることができます。 パイプライン内のテーブルに非修飾名を指定した場合、プライベート テーブルとその名前を持つカタログ テーブルの両方がある場合は、プライベート テーブルが使用されます。
プライベート テーブルは、以前は一時テーブルとして参照されていました。
具体化されたビューまたはストリーミング テーブルからレコードを完全に削除する
GDPR コンプライアンスの場合など、削除ベクトルが有効になっているストリーミング テーブルからレコードを完全に削除するには、オブジェクトの基になる Delta テーブルに対して追加の操作を実行する必要があります。 ストリーミング テーブルからレコードを確実に削除するには、「ストリーミング テーブル からレコードを完全に削除する」を参照してください。
具体化されたビューは、更新時に基になるテーブル内のデータを常に反映します。 具体化されたビューのデータを削除するには、ソースからデータを削除し、具体化されたビューを更新する必要があります。
SQL でテーブルまたはビューを宣言するときに使用される値をパラメーター化する
Spark 構成を含むテーブルまたはビューを宣言するクエリで構成値を指定するためには、SET を使用します。
SET ステートメントの後にソース ファイルで定義したテーブルまたはビューは、定義された値にアクセスできます。
SET ステートメントを使用して指定された Spark 構成は、SET ステートメントに続く任意のテーブルまたはビューに対して Spark クエリを実行するときに使用されます。 クエリで構成値を読み取るには、文字列補間構文 ${} を使用します。 次の例では、startDate という名前の Spark 構成値を設定し、その値をクエリで使用します。
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の構成値を指定するには、値ごとに個別の SET ステートメントを使用します。
制限事項
PIVOT 句はサポートされていません。 Spark での pivot 操作では、出力スキーマを計算するために、入力データを積極的に読み込む必要があります。 この機能は、パイプラインではサポートされていません。
注
具体化されたビューを作成するための CREATE OR REFRESH LIVE TABLE 構文は非推奨です。 代わりにCREATE OR REFRESH MATERIALIZED VIEWを使用してください。