次の方法で共有


トランザクション モード

Important

Unity カタログのマネージド デルタ テーブルに書き込むトランザクションは 、パブリック プレビュー段階にあります

Unity カタログで管理されている Iceberg テーブルに書き込むトランザクションは 、プライベート プレビュー段階にあります。 このプレビューに参加するには、 管理された Iceberg テーブルのプレビュー登録フォームを送信します

トランザクションでは、非対話型モードと対話型モードの 2 つのモードがサポートされています。 このページでは、各モードを使用するタイミングについて説明し、実装例を示します。

トランザクションの要件と概要については、「 トランザクション」を参照してください。 両方のモードでの実践的な練習については、「 チュートリアル: テーブル間でトランザクションを調整する」を参照してください。

複数ステートメントで書き込まれるすべてのテーブルでは、複数テーブル トランザクションで次の処理を行う必要があります。

非対話型トランザクション

非対話型トランザクションでは、 キーワードを使用して ATOMIC使用します。 ATOMIC 複合ステートメント ブロックは、すべてのステートメントを 1 つのアトミック単位として実行します。 すべてが一緒に成功するか、すべて一緒に失敗します。

サポートされているコンピューティング: Databricks Runtime 18.0 以降を実行している SQL ウェアハウスサーバーレス コンピューティング、または クラスター

サポートされている構文: SQL、Scala spark.sql ブロック、PySpark spark.sql ブロックをサポートします。

構造化ストリーミングの forEachBatch 内で非対話型トランザクションを使用する場合は、 spark.sql("BEGIN ATOMIC ... END;")を呼び出します。 ただし、構造化ストリーミングのチェックポイントはトランザクション的に進行しない。

構文

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

すべてのステートメントが成功した場合、Azure Databricks によってすべての変更が自動的にコミットされます。 いずれかのステートメントが失敗した場合、Azure Databricks はすべての変更を自動的にロールバックします。

SQL エディターでの使用

SQL エディターで非対話型トランザクションを直接実行します。 ATOMIC 複合ステートメント ブロック全体を選択し、1 つのステートメントとして実行します。

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

ノートブックで使用する

SQL セルまたはプログラム API を使用して、ノートブックで非対話型トランザクションを実行します。

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

スケジュールされたジョブでの使用

非対話型トランザクションは、コミットとロールバックを自動的に処理するため、スケジュールされたジョブで適切に機能します。

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

アサーションを含むステートメントが失敗した場合、トランザクション全体が自動的にロールバックされます。

JDBC での使用

外部クライアントは、非対話型トランザクションを実行できます。

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

ステートメント実行 API で使用する

ステートメント実行 API を使用して非対話型トランザクションを実行します。

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

ETL パターン

次のパターンは、非対話型トランザクションを使用する一般的な ETL ワークフローを示しています。

ステージングと検証のパターン

このパターンでは、データをステージング領域に読み込み、データ品質を検証し、検証済みのレコードを実稼働テーブルにマージします。

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

ディメンションとファクト テーブルのパターン

このパターンは、参照整合性を維持するためにファクト テーブルを読み込む前にディメンション テーブルを更新します。

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

エラー処理

BEGIN ATOMIC ... END; ブロック内でステートメントが失敗すると、Azure Databricks はすべての変更をロールバックし、エラー メッセージを返します。

デバッグのヒント:

  1. エラー メッセージを確認して、失敗したステートメントを特定します。
  2. トランザクション ブロックの外側でステートメントを個別にテストします。
  3. カスタムエラーメッセージとともに失敗するようSIGNALを使用して検証チェックを追加します。
  4. トランザクション履歴にクエリを実行して、追加のコンテキストを確認します。

対話型トランザクション

対話型トランザクションを使用すると、トランザクションの境界を明示的に制御できます。 トランザクションを手動で開始し、ステートメントを実行し、明示的に コミット または ロールバックします

サポートされているコンピューティング: SQL ウェアハウス のみ。

サポートされている構文: SQL のみ。

構文

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

コミットする前に検証する

コミットする前に、対話型トランザクションを使用して結果を検証します。

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

明示的なロールバック

検証に失敗した場合、またはビジネス ロジックで変更を破棄する必要がある場合は、トランザクションをロールバックします。

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

JDBC での使用

JDBC ドライバーは、トランザクション内の executeUpdate() を使用した DML ステートメントの実行をサポートします。 サポートされている DML ステートメントの一覧については、「 サポートされる操作」を参照してください。

JDBC クライアントでは、自動コミット モードを無効にして対話型トランザクションを使用します。

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

サポートされていない JDBC 操作

対話型トランザクションでは、次の JDBC 操作はサポートされていません。

カテゴリ サポートしていません
カタログまたはスキーマの切り替え Connection.setCatalog() および Connection.setSchema()
セッション構成の変更 Connection.setClientInfo() のセッション レベルのプロパティ、例えば TIMEZONEANSI_MODE の場合
すべての DatabaseMetaData (すべてのプロトコル) すべての DatabaseMetaData.* メソッド
PreparedStatement メタデータ PreparedStatement.getMetaData()
ストアド プロシージャ CALL procedure_name()

ODBC で使用する

ODBC ドライバーでは、トランザクション内で SQLExecute()SQLExecDirect() を使用した DML ステートメントの実行がサポートされています。 サポートされている DML ステートメントの一覧については、「 サポートされる操作」を参照してください。

ODBC クライアントは、標準の ODBC トランザクション管理機能を使用して、Azure Databricks ODBC ドライバーで対話型トランザクションを使用できます。

サポートされていない ODBC 操作

対話型トランザクションでは、次の ODBC 操作はサポートされていません。

カテゴリ サポートしていません
すべてのカタログ関数 SQLTablesSQLColumnsSQLStatisticsSQLSpecialColumnsSQLPrimaryKeysSQLForeignKeysSQLTablePrivilegesSQLColumnPrivilegesSQLProceduresSQLProcedureColumns
接続属性の設定 SQLSetConnectAttr() を使用したカタログ切り替え、分離レベルの変更、アクセスモードの変更
SQL 変換 SQLNativeSql

Databricks SQL Connector for Python で使用する

Databricks SQL Connector for Python では、トランザクション内で cursor.execute() を使用した DML ステートメントの実行がサポートされています。 サポートされている DML ステートメントの一覧については、「 サポートされる操作」を参照してください。

Python アプリケーションでは、次の設定することで、autocommit=False で対話型トランザクションを使用できます。

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

サポートされていない Python コネクタ操作

対話型トランザクションでは、次の Python コネクタ操作はサポートされていません。

カテゴリ サポートしていません
すべてのメタデータ cursor.catalogs()cursor.schemas()cursor.tables()cursor.columns()

対話型トランザクションのドライバーの制限事項

対話型トランザクションを使用する場合、すべてのドライバーに次の制限が適用されます。

メタデータ操作は、対話型トランザクション内ではサポートされていません。 ドライバーまたはプロトコルに関係なく、トランザクション内で次の操作が失敗する可能性があります。

ドライバー/プロトコル タイプ メソッド
JDBC DatabaseMetaData getCatalogs()getSchemas()getTables()getColumns()getTypeInfo()
ODBC カタログ関数 SQLTablesSQLColumnsSQLGetTypeInfo
Python コネクタ メタデータ メソッド cursor.catalogs()cursor.schemas()cursor.tables()cursor.columns()
SQL メタデータ コマンド SHOW TABLESSHOW DATABASESDESCRIBE TABLEUSE CATALOGUSE SCHEMA
SQL information_schema SELECT information_schema テーブルに対するクエリ

トランザクションの外部ですべてのメタデータ操作を実行します。

Warnung

1 つのドライバー接続オブジェクトで複数のスレッドでトランザクションを実行すると、未定義の動作が発生します。 各接続オブジェクトで一度に 1 つのトランザクションのみを実行します。

分離の動作

対話型トランザクションのコミットされていない変更は、セッションにのみ表示されます。 他のセッションでは、トランザクションが開始される前と同じテーブルの状態が表示されます。

対話型トランザクションでは、非対話型トランザクションよりも保守的な競合検出が使用され、テーブル レベルで競合する可能性があります (無条件の追加を除く)。 行レベルの競合検出には、非対話型トランザクション (BEGIN ATOMIC ... END;) を使用します。

  1. 分離を確認するには、サンプル テーブルが存在しない場合は作成します。
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. 同じ セッションで、トランザクションを開始し、変更を加えます。

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. 別の SQL エディター タブまたはノートブック セッション (同じノートブック内の新しいセルではない) で、テーブルに対してクエリを実行します。

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    コミットされていない変更は最初のセッションの外部には表示されないため、0 行が返されます。

  3. 最初のセッションに戻り、コミットを実行します。

    COMMIT;
    
  4. もう一度 2 番目のセッションからクエリを実行します。

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    トランザクションがコミットされているため、行が表示されます。

この分離により、他のユーザーがロールバックされる可能性のあるデータを読み取りできなくなります。

トランザクション モードを選択する

シナリオ 推奨モード
スケジュールされた ETL 処理 非対話型 — 自動コミットまたはロールバックにより、エラー処理が簡略化されます
ステートメント シーケンスを修正しました 非対話型 -単純な構文、手動コミットは必要ありません
コミット前のデータ検証 対話型 - 結果を検査し、コミットするかどうかを決定する
手動制御が必要な JDBC アプリケーション 対話型 - 標準のデータベース トランザクション パターン

次のステップ