次の方法で共有


チュートリアル: テーブル間でトランザクションを調整する

Important

Unity カタログのマネージド デルタ テーブルに書き込むトランザクションは 、パブリック プレビュー段階にあります

Unity カタログで管理されている Iceberg テーブルに書き込むトランザクションは 、プライベート プレビュー段階にあります。 このプレビューに参加するには、 管理された Iceberg テーブルのプレビュー登録フォームを送信します

このチュートリアルでは、トランザクションを使用して複数のステートメントとテーブル間で更新を調整する方法について説明します。 トランザクション モード (非対話型トランザクション、自動的にコミットされるトランザクション) と、明示的な制御を提供する対話型トランザクションの両方について学習します。 このチュートリアルでは、ストアド プロシージャと SQL スクリプトを使用したトランザクションを使用して、Azure Databricks でミッション クリティカルなウェアハウス ワークロードを構築する方法についても説明します。

必要条件

  • 環境: Azure Databricks ワークスペースへのアクセス。
  • コンピューティング: サポートされているコンピューティングの種類は、トランザクション モードによって異なります。
    • クラシック SQL ウェアハウスまたはサーバーレス SQL ウェアハウス では、両方のトランザクション モードがサポートされています。
    • サーバーレス コンピューティング では、非対話型トランザクションのみがサポートされます。
    • Databricks Runtime 18.0 以降を実行しているクラシック クラスター では、非対話型トランザクションのみがサポートされます。
  • 特権: CREATE TABLE スキーマでの

サンプル テーブルを設定する

複数ステートメントで書き込まれるすべてのテーブルでは、複数テーブル トランザクションで次の処理を行う必要があります。

SQL エディターまたはノートブックで 2 つのサンプル テーブルを作成します。

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

セットアップを確認します。

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

アウトプット:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

非対話型トランザクション

非対話型トランザクションでは、 BEGIN ATOMIC ... END; 構文が使用されます。 すべてのステートメントは、1 つのアトミック単位として実行されます。 すべてのステートメントが成功すると、Azure Databricks は自動的にコミットします。 いずれかのステートメントが失敗した場合、Azure Databricks はすべての変更を自動的にロールバックします。 構文と使用パターンの詳細については、 非対話型トランザクションを参照してください。

成功したトランザクションを実行する

両方のテーブルをアトミックに更新します。

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

両方の操作が成功したことを確認します。

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

残高更新とトランザクション レコードの両方が一緒に作成されました。 いずれかのステートメントが失敗した場合、どちらの変更もコミットされず、Databricks は副作用なしにトランザクションを終了します。

SIGNAL を使用して条件に基づいてトランザクションを失敗させる

SIGNAL ブロック内のBEGIN ATOMIC ... END;を使用して、ユーザー定義の条件が満たされていない場合にトランザクションを失敗させることができます。 これは、コミットする前のデータ検証に役立ちます。

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

SIGNALによってエラーが発生し、トランザクション全体が自動的にロールバックされます。 挿入がロールバックされたことを確認してください。

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

失敗時の自動ロールバックを確認する

無効なステートメントを使用してトランザクションを実行します。

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

トランザクションはエラーで失敗します。 最初のステートメントがロールバックされたことを確認します。

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

最初の INSERT ステートメントは有効でしたが、2 つ目のステートメントが失敗したため、ロールバックされました。 これは、トランザクションの全保証または無保証を示しています。

対話型トランザクション

対話型トランザクションを使用すると、コミットまたはロールバックするタイミングを明示的に制御できます。 BEGIN TRANSACTION を使用して開始し、COMMIT を使用して変更を保存するか、ROLLBACK を使用して変更を破棄します。

変更をコミットする

トランザクションを開始します。

BEGIN TRANSACTION;

変更を加えます (まだコミットされていません)。

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

変更を確定して永続化する

COMMIT;

変更を確認します。

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

変更をロールバックする

新しいトランザクションを開始します。

BEGIN TRANSACTION;

変更を加えます。

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

変更がセッションに表示されていることを確認します。

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

ロールバックして変更を破棄します。

ROLLBACK;

変更が破棄されたことを確認します。

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

ストアド プロシージャと SQL スクリプトで使用する

トランザクションと ストアド プロシージャ を組み合わせて、再利用可能なトランザクション ロジックを作成できます。 このパターンは、頻繁に実行する複雑な操作に役立ちます。

  1. カタログ管理コミットを有効にして 2 つのテーブルを作成する

    
    CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    
  2. ストアド プロシージャを定義する

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. トランザクションを定義する

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

トランザクションの一部が失敗した場合、Databricks はすべての変更を自動的にロールバックします。

クリーンアップ

サンプル テーブルを削除します。

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;


DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;

次のステップ