Important
Unity カタログのマネージド デルタ テーブルに書き込むトランザクションは 、パブリック プレビュー段階にあります。
Unity カタログで管理されている Iceberg テーブルに書き込むトランザクションは 、プライベート プレビュー段階にあります。 このプレビューに参加するには、 管理された Iceberg テーブルのプレビュー登録フォームを送信します。
トランザクションでは、非対話型モードと対話型モードの 2 つのモードがサポートされています。 このページでは、各モードを使用するタイミングについて説明し、実装例を示します。
トランザクションの要件と概要については、「 トランザクション」を参照してください。 両方のモードでの実践的な練習については、「 チュートリアル: テーブル間でトランザクションを調整する」を参照してください。
注
複数ステートメントで書き込まれるすべてのテーブルでは、複数テーブル トランザクションで次の処理を行う必要があります。
- Unity カタログのマネージド テーブル (Delta または Iceberg) にする
- カタログ管理コミットを有効にする
非対話型トランザクション
非対話型トランザクションでは、 キーワードを使用して ATOMIC使用します。
ATOMIC 複合ステートメント ブロックは、すべてのステートメントを 1 つのアトミック単位として実行します。 すべてが一緒に成功するか、すべて一緒に失敗します。
サポートされているコンピューティング: Databricks Runtime 18.0 以降を実行している SQL ウェアハウス、 サーバーレス コンピューティング、または クラスター 。
サポートされている構文: SQL、Scala spark.sql ブロック、PySpark spark.sql ブロックをサポートします。
注
構造化ストリーミングの forEachBatch 内で非対話型トランザクションを使用する場合は、 spark.sql("BEGIN ATOMIC ... END;")を呼び出します。 ただし、構造化ストリーミングのチェックポイントはトランザクション的に進行しない。
構文
BEGIN ATOMIC
statement1;
statement2;
statement3;
END;
すべてのステートメントが成功した場合、Azure Databricks によってすべての変更が自動的にコミットされます。 いずれかのステートメントが失敗した場合、Azure Databricks はすべての変更を自動的にロールバックします。
SQL エディターでの使用
SQL エディターで非対話型トランザクションを直接実行します。 ATOMIC 複合ステートメント ブロック全体を選択し、1 つのステートメントとして実行します。
BEGIN ATOMIC
DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;
INSERT INTO staging_sales
SELECT * FROM raw_sales WHERE load_date = current_date();
MERGE INTO sales AS target
USING staging_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
ノートブックで使用する
SQL セルまたはプログラム API を使用して、ノートブックで非対話型トランザクションを実行します。
SQL
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
Python
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Scala
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
スケジュールされたジョブでの使用
非対話型トランザクションは、コミットとロールバックを自動的に処理するため、スケジュールされたジョブで適切に機能します。
BEGIN ATOMIC
-- Clear previous staging data
DELETE FROM staging_daily_sales WHERE load_date = current_date();
-- Load new data
INSERT INTO staging_daily_sales
SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
FROM raw_sales
WHERE sale_date = current_date() - INTERVAL 1 DAY;
-- Validate row count (fails transaction if no data)
IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
END IF;
-- Merge into production
MERGE INTO daily_sales AS target
USING staging_daily_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
アサーションを含むステートメントが失敗した場合、トランザクション全体が自動的にロールバックされます。
JDBC での使用
外部クライアントは、非対話型トランザクションを実行できます。
JDBC
String sql = """
BEGIN ATOMIC
INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
END;
""";
Statement stmt = conn.createStatement();
stmt.execute(sql);
ステートメント実行 API で使用する
ステートメント実行 API を使用して非対話型トランザクションを実行します。
import requests
sql = """
BEGIN ATOMIC
INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""
response = requests.post(
f"{workspace_url}/api/2.0/sql/statements",
headers={"Authorization": f"Bearer {token}"},
json={
"warehouse_id": warehouse_id,
"statement": sql,
"wait_timeout": "30s"
}
)
ETL パターン
次のパターンは、非対話型トランザクションを使用する一般的な ETL ワークフローを示しています。
ステージングと検証のパターン
このパターンでは、データをステージング領域に読み込み、データ品質を検証し、検証済みのレコードを実稼働テーブルにマージします。
BEGIN ATOMIC
-- Load into staging
INSERT INTO staging_customers
SELECT * FROM external_source
WHERE ingest_date = current_date();
-- Validate data quality
IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
END IF;
-- Merge validated data
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Update metadata
UPDATE etl_metadata
SET last_load_date = current_date(),
rows_processed = (SELECT COUNT(*) FROM staging_customers)
WHERE table_name = 'customers';
END;
ディメンションとファクト テーブルのパターン
このパターンは、参照整合性を維持するためにファクト テーブルを読み込む前にディメンション テーブルを更新します。
BEGIN ATOMIC
-- Update dimension tables first
MERGE INTO dim_products AS target
USING staging_products AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO dim_customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Then load fact table with foreign key references
INSERT INTO fact_sales
SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
FROM staging_sales s
JOIN dim_products p ON s.product_id = p.product_id
JOIN dim_customers c ON s.customer_id = c.customer_id;
END;
エラー処理
BEGIN ATOMIC ... END; ブロック内でステートメントが失敗すると、Azure Databricks はすべての変更をロールバックし、エラー メッセージを返します。
デバッグのヒント:
- エラー メッセージを確認して、失敗したステートメントを特定します。
- トランザクション ブロックの外側でステートメントを個別にテストします。
- カスタムエラーメッセージとともに失敗するよう
SIGNALを使用して検証チェックを追加します。 - トランザクション履歴にクエリを実行して、追加のコンテキストを確認します。
対話型トランザクション
対話型トランザクションを使用すると、トランザクションの境界を明示的に制御できます。 トランザクションを手動で開始し、ステートメントを実行し、明示的に コミット または ロールバックします。
サポートされているコンピューティング: SQL ウェアハウス のみ。
サポートされている構文: SQL のみ。
構文
BEGIN TRANSACTION;
statement1;
statement2;
COMMIT;
-- or: ROLLBACK;
コミットする前に検証する
コミットする前に、対話型トランザクションを使用して結果を検証します。
BEGIN TRANSACTION;
-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();
-- Validate and commit or rollback
BEGIN
DECLARE duplicate_count INT;
SET duplicate_count = (
SELECT COUNT(*) FROM (
SELECT customer_id, COUNT(*) as cnt
FROM staging_customers
WHERE load_date = current_date()
GROUP BY customer_id
HAVING COUNT(*) > 1
)
);
IF duplicate_count > 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
ELSE
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
COMMIT;
END IF;
END;
明示的なロールバック
検証に失敗した場合、またはビジネス ロジックで変更を破棄する必要がある場合は、トランザクションをロールバックします。
BEGIN TRANSACTION;
UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;
-- Check if quantity would go negative
BEGIN
DECLARE new_quantity INT;
SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);
IF new_quantity < 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
ELSE
COMMIT;
END IF;
END;
JDBC での使用
JDBC ドライバーは、トランザクション内の executeUpdate() を使用した DML ステートメントの実行をサポートします。 サポートされている DML ステートメントの一覧については、「 サポートされる操作」を参照してください。
JDBC クライアントでは、自動コミット モードを無効にして対話型トランザクションを使用します。
Connection conn = DriverManager.getConnection(jdbcUrl, properties);
try {
conn.setAutoCommit(false); // Start transaction mode
Statement stmt = conn.createStatement();
stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");
conn.commit(); // Commit the transaction
} catch (SQLException e) {
conn.rollback(); // Roll back on error
throw e;
} finally {
conn.close();
}
サポートされていない JDBC 操作
対話型トランザクションでは、次の JDBC 操作はサポートされていません。
| カテゴリ | サポートしていません |
|---|---|
| カタログまたはスキーマの切り替え |
Connection.setCatalog() および Connection.setSchema() |
| セッション構成の変更 |
Connection.setClientInfo() のセッション レベルのプロパティ、例えば TIMEZONE や ANSI_MODE の場合 |
| すべての DatabaseMetaData (すべてのプロトコル) | すべての DatabaseMetaData.* メソッド |
| PreparedStatement メタデータ | PreparedStatement.getMetaData() |
| ストアド プロシージャ | CALL procedure_name() |
ODBC で使用する
ODBC ドライバーでは、トランザクション内で SQLExecute() と SQLExecDirect() を使用した DML ステートメントの実行がサポートされています。 サポートされている DML ステートメントの一覧については、「 サポートされる操作」を参照してください。
ODBC クライアントは、標準の ODBC トランザクション管理機能を使用して、Azure Databricks ODBC ドライバーで対話型トランザクションを使用できます。
サポートされていない ODBC 操作
対話型トランザクションでは、次の ODBC 操作はサポートされていません。
| カテゴリ | サポートしていません |
|---|---|
| すべてのカタログ関数 |
SQLTables、SQLColumns、SQLStatistics、SQLSpecialColumns、SQLPrimaryKeys、SQLForeignKeys、SQLTablePrivileges、SQLColumnPrivileges、SQLProcedures、SQLProcedureColumns |
| 接続属性の設定 |
SQLSetConnectAttr() を使用したカタログ切り替え、分離レベルの変更、アクセスモードの変更 |
| SQL 変換 | SQLNativeSql |
Databricks SQL Connector for Python で使用する
Databricks SQL Connector for Python では、トランザクション内で cursor.execute() を使用した DML ステートメントの実行がサポートされています。 サポートされている DML ステートメントの一覧については、「 サポートされる操作」を参照してください。
Python アプリケーションでは、次の設定することで、autocommit=False で対話型トランザクションを使用できます。
from databricks import sql
with sql.connect(
server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
http_path="sql/1.0/warehouses/abc123def456",
access_token="your-access-token",
autocommit=False
) as connection:
with connection.cursor() as cursor:
cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
connection.commit()
サポートされていない Python コネクタ操作
対話型トランザクションでは、次の Python コネクタ操作はサポートされていません。
| カテゴリ | サポートしていません |
|---|---|
| すべてのメタデータ |
cursor.catalogs()、 cursor.schemas()、 cursor.tables()、 cursor.columns() |
対話型トランザクションのドライバーの制限事項
対話型トランザクションを使用する場合、すべてのドライバーに次の制限が適用されます。
メタデータ操作は、対話型トランザクション内ではサポートされていません。 ドライバーまたはプロトコルに関係なく、トランザクション内で次の操作が失敗する可能性があります。
| ドライバー/プロトコル | タイプ | メソッド |
|---|---|---|
| JDBC | DatabaseMetaData |
getCatalogs()、getSchemas()、getTables()、getColumns()、getTypeInfo() |
| ODBC | カタログ関数 |
SQLTables、SQLColumns、SQLGetTypeInfo |
| Python コネクタ | メタデータ メソッド |
cursor.catalogs()、 cursor.schemas()、 cursor.tables()、 cursor.columns() |
| SQL | メタデータ コマンド |
SHOW TABLES、SHOW DATABASES、DESCRIBE TABLE、USE CATALOG、USE SCHEMA |
| SQL | information_schema |
SELECT
information_schema テーブルに対するクエリ |
トランザクションの外部ですべてのメタデータ操作を実行します。
Warnung
1 つのドライバー接続オブジェクトで複数のスレッドでトランザクションを実行すると、未定義の動作が発生します。 各接続オブジェクトで一度に 1 つのトランザクションのみを実行します。
分離の動作
対話型トランザクションのコミットされていない変更は、セッションにのみ表示されます。 他のセッションでは、トランザクションが開始される前と同じテーブルの状態が表示されます。
注
対話型トランザクションでは、非対話型トランザクションよりも保守的な競合検出が使用され、テーブル レベルで競合する可能性があります (無条件の追加を除く)。 行レベルの競合検出には、非対話型トランザクション (BEGIN ATOMIC ... END;) を使用します。
- 分離を確認するには、サンプル テーブルが存在しない場合は作成します。
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
同じ セッションで、トランザクションを開始し、変更を加えます。
BEGIN TRANSACTION; INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);別の SQL エディター タブまたはノートブック セッション (同じノートブック内の新しいセルではない) で、テーブルに対してクエリを実行します。
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;コミットされていない変更は最初のセッションの外部には表示されないため、0 行が返されます。
最初のセッションに戻り、コミットを実行します。
COMMIT;もう一度 2 番目のセッションからクエリを実行します。
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;トランザクションがコミットされているため、行が表示されます。
この分離により、他のユーザーがロールバックされる可能性のあるデータを読み取りできなくなります。
トランザクション モードを選択する
| シナリオ | 推奨モード |
|---|---|
| スケジュールされた ETL 処理 | 非対話型 — 自動コミットまたはロールバックにより、エラー処理が簡略化されます |
| ステートメント シーケンスを修正しました | 非対話型 -単純な構文、手動コミットは必要ありません |
| コミット前のデータ検証 | 対話型 - 結果を検査し、コミットするかどうかを決定する |
| 手動制御が必要な JDBC アプリケーション | 対話型 - 標準のデータベース トランザクション パターン |
次のステップ
関連する SQL リファレンス
- ATOMIC 複合ステートメント (非対話型トランザクション):自動コミットとロールバックを使用して、複数の SQL ステートメントを 1 つのアトミック トランザクションとして実行します。
- BEGIN TRANSACTION (対話型トランザクション): 手動コミットとロールバック制御を使用して対話型トランザクションを開始します。
- COMMIT: 対話型トランザクションをコミットし、すべての変更を永続的にします。
- ROLLBACK: 対話型トランザクションをロールバックし、すべての変更を破棄します。