大量のデータを処理する場合は、データセット全体を再処理するのではなく、新しいレコードと変更されたレコードのみを処理できるパイプラインが必要です。 これは増分 ETL と呼ばれます。 Databricks SQL では、手続き型コードを記述したり、手動更新をスケジュールしたりすることなく、ストリーミング テーブルと具体化されたビューを使用して、増分 ETL パイプラインを構築できます。
このチュートリアルでは、一般的なパターン (時間の経過に伴う製品の変化の追跡) について説明します。 ソース テーブルを作成し、変更イベントをキャプチャし、各製品の完全な履歴を保持するディメンション テーブルを作成し、上部に集計レポート レイヤーを追加します。
このチュートリアルの主な機能は AUTO CDCです。 従来のウェアハウスでは、挿入、更新、および削除イベントをターゲット テーブルに調整するための複雑な MERGE INTO ステートメントを記述します。 この方法は、特にイベントが順不同で到着する場合に、エラーが発生しやすくなります。
AUTO CDC これを自動的に処理します。 ビジネス キー、シーケンス列、SCD タイプ 1 (最新の値のみ) と SCD タイプ 2 (完全な履歴) のどちらを使用するかを宣言し、Azure Databricksは正しいマージ ロジックを自動的に適用します。 CDC の概要については、「 AUTO CDC API: パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。
このチュートリアルの最後には、次の内容が含まれます。
- 変更データ フィードを使用して変更を追跡するソース テーブルを作成しました。
- CDC イベント ストリームを理解するために生の変更データを検査しました。
- これらのイベントから SCD Type 2 ディメンション テーブルを作成するために
AUTO CDC使用されます。 - パイプラインを通じて段階的に処理された削除イベント。
- 集計レポートを増分的に保持する具体化されたビューを作成しました。
-
SCHEDULE REFRESH EVERY 1 DAYがパイプラインを通じて変更を自動的に反映するように設定されています。
必要条件
このチュートリアルを完了するには、次の要件を満たす必要があります。
- Unity カタログが有効になっているAzure Databricks ワークスペース。
- SQL ウェアハウス (サーバーレスまたはプロ)。
- コンピューティング リソースを作成したり、コンピューティング リソースにアクセスしたりするためのアクセス許可を持っている。
- アカウントに対して有効になっているサーバーレス コンピューティング。 「利用可能なリージョンに制限がある機能」を参照してください。
手順 1: カタログとスキーマを設定する
Databricks SQL エディターを開き、作業カタログとスキーマを設定します。 選択したカタログとスキーマを USE するアクセス許可が必要です。
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
手順 2: ソース テーブルを作成してデータを読み込む
products (CDF) で Delta Lake 変更データ フィードを有効にして、 テーブルを作成します。 CDF は、挿入、更新、削除のたびにクエリ可能な変更ログとして記録する Delta Lake 機能です。 これは、トランザクション ソース システムの CDC ストリームに似ていますが、変更は外部ログではなく Delta テーブル内で直接キャプチャされる点が異なります。 ここでは CDF を使用して、ダウンストリーム パイプラインが使用する変更イベントを生成します。
テーブルを作成し、初期レコードを読み込みます。
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');新しい製品、倉庫の移動、カテゴリの再割り当てを含むアップストリームの変更をシミュレートします。
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
手順 3: 変更データ フィードのクエリを実行する
ダウンストリーム パイプラインを構築する前に、生の変更イベントを確認して、 AUTO CDC が処理する内容を理解するのに役立ちます。
table_changes()関数は CDF ログを読み取り、キャプチャされたすべての操作をメタデータ列と共に返します。
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
たとえば、Spoon には、 insert (シアトル)、 update_preimage (シアトル)、 update_postimage (ロサンゼルス) の 3 つのイベントがあります。
1 つの論理的な変更 (たとえば、Spoon を別の倉庫に移動するなど) では、複数のイベント (事前イメージとイメージ後) が生成されます。 従来のウェアハウスでは、 MERGE ステートメントを記述して、これらのイベントをすべてターゲット テーブルに調整し、挿入、更新、削除を個別のロジックで処理し、イベントが正しい順序で適用されるようにします。 次のステップでこのAUTO CDC複雑さを排除します。
手順 4: SCD タイプ 2 ディメンションを作成する AUTO CDC
Important
AUTO CDC は ベータ版です。 Databricks Runtime 17.3 以降が必要です。
ストリーミング テーブルは、データを段階的に処理します。 更新のたびに、前回の実行以降の新しい行のみが読み取られるので、完全なデータセットを再処理する必要はありません。 これにより、大量のソースや頻繁に変化するソースに適しています。
AUTO CDC は、ストリーミング テーブルの上に変更データ キャプチャ処理を追加します。 挿入、更新、削除を手動で処理する MERGE INTO ステートメントを記述する代わりに、ビジネス キーとシーケンス列を宣言し、Azure Databricks正しいロジックを適用できるようにします。
AUTO CDC また、順不同のイベントも自動的に処理されます。これは、 MERGE INTO を使用して分散システムから到着するイベントや、タイムスタンプが重複するバッチ読み込みを処理する場合に一般的な問題です。
次のステートメントでは、各製品の完全なバージョン履歴を保持する SCD Type 2 テーブルを作成します。 各バージョンは __START_AT および __END_AT タイムスタンプを取得します。
NULLの__END_ATは、現在のバージョンをマークします。
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: 毎日のスケジュールでテーブルを更新します。 -
FLOW AUTO CDC: これを CDC フローとして宣言します。 Azure Databricksは、挿入、更新、および削除セマンティクスを自動的に適用します。 -
KEYS (product_id): ビジネス キー。 同じキーを持つイベントは、バージョン管理された行にマージされます。 -
APPLY AS DELETE WHEN _change_type = 'delete': 削除イベントが到着すると、現在のバージョンが閉じられます。 これにより、削除イベントを識別する条件を定義できます。 -
SEQUENCE BY _commit_timestamp: イベントの順序付けを確立します。 注文外の到着を正しく処理します。 -
STORED AS SCD TYPE 2: 完全な履歴を保持します。AUTO CDCは、SCD タイプ 1 と SCD タイプ 2 の両方をサポートします。
ディメンション テーブルのクエリを実行します。
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- スプーン:2つのバージョン。 シアトル (閉鎖、
__END_AT設定済み) とロサンゼルス (現在、__END_AT = NULL)。 - フォーク: 2 つのバージョン。 カトラリーカテゴリ(クローズ)とダイニングカテゴリ(現在)。
- ナプキンとコースター:それぞれ1つのバージョン(新しく挿入、
__END_AT = NULL)。 - その他のすべての製品。それぞれ1つのバージョン (
__END_AT = NULL)。
手順 5: パイプライン経由で削除を処理する
次に、ソース テーブルから削除して、廃止された 2 つの製品をシミュレートします。
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
これらの削除イベントは CDF ログに記録されますが、ストリーミング テーブルにはまだ記録されていません。 ストリーミング テーブルを更新して、新しいイベントを処理します。
REFRESH STREAMING TABLE products_history;
ディメンション テーブルにクエリを実行して、削除が適用されたことを確認します。
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
ボウルとガラスは、 __END_AT セットで閉じられ、廃止されたものとしてマークされるようになりました。 その他の現在の製品はすべて変更されません。 ストリーミング テーブルは、前の更新からの挿入と更新を再処理することなく、新しい削除イベントのみを処理しました。
手順 6: 集計マテリアライズド ビューを作成する
ソースの変更が反映されたディメンション テーブルが作成されたので、レポート レイヤーを上に追加できます。
具体化されたビューは、事前に計算されたクエリ結果を物理テーブルとして格納します。 クエリから読み取るたびにクエリを再実行する通常のビューとは異なり、具体化されたビューは結果を保持し、更新のたびにアップストリームの変更の影響を受ける行のみを再計算します。 これにより、クエリのパフォーマンスが重要なダッシュボードやレポートに適しています。
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY は、このビューが毎日のスケジュールで更新されます。 ストリーミング テーブルの同じスケジュールと組み合わせて、ソース テーブルに対する変更がディメンションを通過し、各更新サイクルの集計にカスケードする 3 段階のパイプラインが作成されました。 実行する手動更新はありません。
SELECT * FROM products_by_category ORDER BY active_products DESC;
手順 7: エンドツーエンドのカスケードを確認する
パイプラインのカスケード全体を確認するには、ソース テーブルに変更を加えます。
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
ナイフはデンバーからシアトルに移動します。 この単一の DML 変更により、完全なパイプライン カスケードがトリガーされ、3 つのステージがどのように連携するかを示します。
-
productsは、CDF を介して変更イベントを記録します。 -
products_historyはイベントを処理し、Knife の新しいバージョンを追加します。 -
products_by_categoryは、影響を受けるカトラリー行のみを再計算します。
確認:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
クリーンアップ
このチュートリアルで作成したリソースをクリーンアップするには、次の SQL を使用します。
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;