Important
Unity カタログのマネージド デルタ テーブルに書き込むトランザクションは 、パブリック プレビュー段階にあります。
Unity カタログで管理されている Iceberg テーブルに書き込むトランザクションは 、プライベート プレビュー段階にあります。 このプレビューに参加するには、 管理された Iceberg テーブルのプレビュー登録フォームを送信します。
このチュートリアルでは、トランザクションを使用して複数のステートメントとテーブル間で更新を調整する方法について説明します。 トランザクション モード (非対話型トランザクション、自動的にコミットされるトランザクション) と、明示的な制御を提供する対話型トランザクションの両方について学習します。 このチュートリアルでは、ストアド プロシージャと SQL スクリプトを使用したトランザクションを使用して、Azure Databricks でミッション クリティカルなウェアハウス ワークロードを構築する方法についても説明します。
必要条件
- 環境: Azure Databricks ワークスペースへのアクセス。
-
コンピューティング: サポートされているコンピューティングの種類は、トランザクション モードによって異なります。
- クラシック SQL ウェアハウスまたはサーバーレス SQL ウェアハウス では、両方のトランザクション モードがサポートされています。
- サーバーレス コンピューティング では、非対話型トランザクションのみがサポートされます。
- Databricks Runtime 18.0 以降を実行しているクラシック クラスター では、非対話型トランザクションのみがサポートされます。
-
特権:
CREATE TABLEスキーマでの。
サンプル テーブルを設定する
複数ステートメントで書き込まれるすべてのテーブルでは、複数テーブル トランザクションで次の処理を行う必要があります。
- Unity カタログのマネージド テーブル (Delta または Iceberg) にする
- カタログ管理コミットを有効にする
SQL エディターまたはノートブックで 2 つのサンプル テーブルを作成します。
-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);
-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
id INT,
account_id INT,
transaction_type STRING,
amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);
-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
-- Insert sample data
INSERT INTO sample_accounts VALUES
(1, 'Alice', 1000.00),
(2, 'Bob', 500.00);
INSERT INTO sample_transactions VALUES
(1, 1, 'deposit', 100.00);
セットアップを確認します。
SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;
アウトプット:
sample_accounts:
id account_name balance
1 Alice 1000.00
2 Bob 500.00
sample_transactions:
id account_id transaction_type amount
1 1 deposit 100.00
非対話型トランザクション
非対話型トランザクションでは、 BEGIN ATOMIC ... END; 構文が使用されます。 すべてのステートメントは、1 つのアトミック単位として実行されます。 すべてのステートメントが成功すると、Azure Databricks は自動的にコミットします。 いずれかのステートメントが失敗した場合、Azure Databricks はすべての変更を自動的にロールバックします。 構文と使用パターンの詳細については、 非対話型トランザクションを参照してください。
成功したトランザクションを実行する
両方のテーブルをアトミックに更新します。
BEGIN ATOMIC
-- Update Alice's account balance
UPDATE sample_accounts
SET balance = balance + 100.00
WHERE id = 1;
-- Record the deposit transaction
INSERT INTO sample_transactions
VALUES (2, 1, 'deposit', 100.00);
END;
両方の操作が成功したことを確認します。
-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;
-- Should show two transaction records
SELECT * FROM sample_transactions;
残高更新とトランザクション レコードの両方が一緒に作成されました。 いずれかのステートメントが失敗した場合、どちらの変更もコミットされず、Databricks は副作用なしにトランザクションを終了します。
SIGNAL を使用して条件に基づいてトランザクションを失敗させる
SIGNAL ブロック内のBEGIN ATOMIC ... END;を使用して、ユーザー定義の条件が満たされていない場合にトランザクションを失敗させることができます。 これは、コミットする前のデータ検証に役立ちます。
BEGIN ATOMIC
-- Insert new account
INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);
-- Fail the transaction if balance is negative
IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
END IF;
END;
SIGNALによってエラーが発生し、トランザクション全体が自動的にロールバックされます。 挿入がロールバックされたことを確認してください。
-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;
失敗時の自動ロールバックを確認する
無効なステートメントを使用してトランザクションを実行します。
BEGIN ATOMIC
-- This statement is valid
INSERT INTO sample_accounts VALUES (4, 'David', 300.00);
-- This statement will fail (table does not exist)
INSERT INTO non_existent_table VALUES (1, 2, 3);
END;
トランザクションはエラーで失敗します。 最初のステートメントがロールバックされたことを確認します。
-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;
最初の INSERT ステートメントは有効でしたが、2 つ目のステートメントが失敗したため、ロールバックされました。 これは、トランザクションの全保証または無保証を示しています。
対話型トランザクション
対話型トランザクションを使用すると、コミットまたはロールバックするタイミングを明示的に制御できます。 BEGIN TRANSACTION を使用して開始し、COMMIT を使用して変更を保存するか、ROLLBACK を使用して変更を破棄します。
変更をコミットする
トランザクションを開始します。
BEGIN TRANSACTION;
変更を加えます (まだコミットされていません)。
INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;
変更を確定して永続化する
COMMIT;
変更を確認します。
-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;
-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;
変更をロールバックする
新しいトランザクションを開始します。
BEGIN TRANSACTION;
変更を加えます。
INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);
変更がセッションに表示されていることを確認します。
-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;
ロールバックして変更を破棄します。
ROLLBACK;
変更が破棄されたことを確認します。
-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;
ストアド プロシージャと SQL スクリプトで使用する
トランザクションと ストアド プロシージャ を組み合わせて、再利用可能なトランザクション ロジックを作成できます。 このパターンは、頻繁に実行する複雑な操作に役立ちます。
カタログ管理コミットを有効にして 2 つのテーブルを作成する
CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported'); CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');ストアド プロシージャを定義する
CREATE OR REPLACE PROCEDURE main.retail.apply_order( IN p_order_id STRING, IN p_customer_id STRING, IN p_order_amount DECIMAL(18,2) ) LANGUAGE SQL SQL SECURITY INVOKER MODIFIES SQL DATA AS BEGIN -- Insert the order INSERT INTO main.retail.orders (order_id, customer_id, amount) VALUES (p_order_id, p_customer_id, p_order_amount); -- Update total sales per customer MERGE INTO main.retail.total_sales AS t USING ( SELECT p_customer_id AS customer_id, p_order_amount AS order_amount ) s ON t.customer_id = s.customer_id WHEN MATCHED THEN UPDATE SET t.total_amount = t.total_amount + s.order_amount WHEN NOT MATCHED THEN INSERT (customer_id, total_amount) VALUES (s.customer_id, s.order_amount); END;トランザクションを定義する
BEGIN ATOMIC -- Staging batch id for this transaction DECLARE new_order_id STRING DEFAULT uuid(); DECLARE v_batch_id STRING DEFAULT uuid(); -- 1) Stage incoming customer and order rows INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id) VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id); -- 2) Drive final writes from staging to production via stored procedure FOR o AS SELECT order_id, customer_id, amount FROM main.retail.orders_staging WHERE batch_id = v_batch_id DO CALL main.retail.apply_order( o.order_id, o.customer_id, o.amount ); END FOR; -- 3) Clean up processed staging rows DELETE FROM main.retail.orders_staging WHERE batch_id = v_batch_id; END; -- 4) Commit the transaction
トランザクションの一部が失敗した場合、Databricks はすべての変更を自動的にロールバックします。
クリーンアップ
サンプル テーブルを削除します。
DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;
DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;
次のステップ
- トランザクション: トランザクションのサポートの概要。
- トランザクション モード: 両方のモードの詳細な構文とパターン。
- カタログ管理コミット: テーブルでトランザクションのサポートを有効にします。
- さまざまなクライアントからのトランザクションを使用する: JDBC、ODBC、Python アプリケーションからトランザクションを実行します。