Mode transaksi

Penting

Transaksi yang menulis ke tabel Delta terkelola Katalog Unity ada di Pratinjau Umum.

Transaksi yang menulis ke tabel Iceberg yang dikelola oleh Unity Catalog ada di Pratinjau Privat. Untuk bergabung dengan pratinjau ini, kirim formulir pendaftaran pratinjau tabel Iceberg terkelola.

Transaksi mendukung dua mode: non-interaktif dan interaktif. Halaman ini mencakup kapan menggunakan setiap mode dan menyertakan contoh implementasi.

Untuk persyaratan dan gambaran umum transaksi, lihat Transaksi. Untuk praktik langsung dengan kedua mode, lihat Tutorial: Mengoordinasikan transaksi di seluruh tabel.

Nota

Semua tabel yang digunakan dalam multi-pernyataan, transaksi dengan banyak tabel harus memenuhi syarat:

  • Jadilah tabel terkelola Unity Catalog (Delta atau Iceberg)
  • Telah diaktifkan komit Katalog

Transaksi non-interaktif

Transaksi non-interaktif menggunakan pembuatan skrip SQL dengan ATOMIC kata kunci. Blok pernyataan senyawa ATOMIC menjalankan semua pernyataan sebagai satu unit atomik. Semua berhasil bersama-sama atau semua gagal bersama-sama.

Komputasi yang didukung: Setiap gudang SQL, komputasi tanpa server, atau kluster yang menjalankan Databricks Runtime 18.0 ke atas.

Sintaks yang didukung: Mendukung blok SQL, Scala spark.sql , dan blok PySpark spark.sql .

Nota

Anda dapat menggunakan transaksi non-interaktif dalam Streaming forEachBatch Terstruktur dengan memanggil spark.sql("BEGIN ATOMIC ... END;"). Namun, titik pemeriksaan Streaming Terstruktur tidak maju secara transaksional.

Sintaks

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

Azure Databricks secara otomatis menerapkan semua perubahan jika semua pernyataan berhasil. Jika ada pernyataan yang gagal, Azure Databricks secara otomatis mengembalikan semua perubahan.

Gunakan di editor SQL

Jalankan transaksi non-interaktif langsung di Editor SQL. Pilih seluruh blok pernyataan senyawa ATOMIC dan jalankan sebagai satu pernyataan:

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Gunakan di buku catatan

Jalankan transaksi non-interaktif di notebook menggunakan sel SQL atau API terprogram.

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Gunakan dalam pekerjaan terjadwal

Transaksi non-interaktif bekerja dengan baik dalam pekerjaan terjadwal karena secara otomatis menangani penerapan dan pembatalan:

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Jika ada pernyataan yang gagal, termasuk asersi, seluruh transaksi akan dilakukan rollback secara otomatis.

Gunakan dengan JDBC

Klien eksternal dapat menjalankan transaksi non-interaktif.

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

Gunakan dengan API Eksekusi Pernyataan

Jalankan transaksi non-interaktif menggunakan API Eksekusi Pernyataan:

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

Pola ETL

Pola berikut menunjukkan alur kerja ETL umum menggunakan transaksi non-interaktif.

Pola penahapan dan validasi

Pola ini memuat data ke area penahapan, memvalidasi kualitas data, dan menggabungkan rekaman yang divalidasi ke dalam tabel produksi:

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

Pola tabel dimensi dan fakta

Pola ini memperbarui tabel dimensi sebelum memuat tabel fakta untuk mempertahankan integritas referensial:

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

Penanganan kesalahan

Saat pernyataan gagal dalam blok BEGIN ATOMIC ... END;, Azure Databricks mengembalikan semua perubahan dan mengembalikan pesan kesalahan.

Tips penelusuran kesalahan:

  1. Tinjau pesan kesalahan untuk mengidentifikasi pernyataan mana yang gagal.
  2. Menguji pernyataan satu per satu di luar blok transaksi.
  3. Tambahkan pemeriksaan validasi menggunakan SIGNAL untuk gagal dengan pesan kesalahan kustom.
  4. Kueri riwayat transaksi untuk mendapatkan konteks tambahan.

Transaksi interaktif

Transaksi interaktif memberi Anda kontrol eksplisit atas batas transaksi. Anda memulai transaksi secara manual, menjalankan statement, dan secara eksplisit memproses atau membatalkan.

Komputasi yang didukung: Hanya gudang SQL .

Sintaks yang didukung: SQL saja.

Sintaks

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

Memvalidasi sebelum menerapkan

Gunakan transaksi interaktif untuk memvalidasi hasil sebelum melakukan:

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

Pengembalian eksplisit

Memulihkan transaksi saat validasi gagal atau logika bisnis mengharuskan perubahan dibatalkan.

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

Gunakan dengan JDBC

Driver JDBC mendukung menjalankan pernyataan DML menggunakan executeUpdate() dalam transaksi. Untuk daftar pernyataan DML yang didukung, lihat Operasi yang didukung.

Klien JDBC menggunakan transaksi interaktif dengan menonaktifkan mode penerapan otomatis:

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

Operasi JDBC yang tidak didukung

Operasi JDBC berikut ini tidak didukung dalam transaksi interaktif:

Kategori Tidak didukung
Pengalihan katalog atau skema Connection.setCatalog() dan Connection.setSchema()
Perubahan konfigurasi sesi Connection.setClientInfo() untuk properti tingkat sesi seperti TIMEZONE dan ANSI_MODE
Semua DatabaseMetaData (semua protokol) Semua DatabaseMetaData.* metode
Metadata untuk PreparedStatement PreparedStatement.getMetaData()
Prosedur yang disimpan CALL procedure_name()

Gunakan dengan ODBC

Driver ODBC mendukung menjalankan pernyataan DML menggunakan SQLExecute() dan SQLExecDirect() dalam transaksi. Untuk daftar pernyataan DML yang didukung, lihat Operasi yang didukung.

Klien ODBC dapat menggunakan transaksi interaktif dengan driver ODBC Azure Databricks menggunakan fungsi manajemen transaksi ODBC standar.

Nota

AutoCommit harus dinonaktifkan untuk menggunakan transaksi. UseNativeQuery harus diatur ke 1 untuk menonaktifkan AutoCommit saat runtime.

Operasi ODBC yang tidak didukung

Operasi ODBC berikut ini tidak didukung dalam transaksi interaktif:

Kategori Tidak didukung
Semua fungsi katalog SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns
Mengatur atribut koneksi Pengalihan katalog, perubahan tingkat isolasi, dan perubahan mode akses menggunakan SQLSetConnectAttr()
Terjemahan SQL SQLNativeSql

Gunakan dengan Konektor SQL Databricks untuk Python

Konektor SQL Databricks untuk Python mendukung menjalankan pernyataan DML menggunakan cursor.execute() dalam transaksi. Untuk daftar pernyataan DML yang didukung, lihat Operasi yang didukung.

aplikasi Python dapat menggunakan transaksi interaktif dengan Konektor SQL Databricks untuk Python dengan mengatur autocommit=False:

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

Operasi konektor Python yang tidak didukung

Operasi konektor Python berikut tidak didukung dalam transaksi interaktif:

Kategori Tidak didukung
Semua metadata cursor.catalogs(),cursor.schemas(),cursor.tables(),cursor.columns()

Batasan driver untuk transaksi interaktif

Batasan berikut berlaku untuk semua driver saat menggunakan transaksi interaktif.

Operasi metadata tidak didukung di dalam transaksi interaktif. Operasi berikut mungkin gagal dalam transaksi terlepas dari driver atau protokol:

Driver/Protokol Tipe Metode
JDBC DatabaseMetaData getCatalogs(), getSchemas(), getTables(), getColumns(), getTypeInfo()
ODBC Fungsi katalog SQLTables, SQLColumns, SQLGetTypeInfo
konektor Python Metode metadata cursor.catalogs(),cursor.schemas(),cursor.tables(),cursor.columns()
SQL Perintah metadata SHOW TABLES, SHOW DATABASES, DESCRIBE TABLE, USE CATALOG, USE SCHEMA
SQL information_schema SELECT kueri terhadap information_schema tabel

Jalankan semua operasi metadata di luar transaksi.

Peringatan

Menjalankan transaksi pada beberapa utas pada satu objek koneksi driver menyebabkan perilaku yang tidak terdefinisi. Jalankan hanya satu transaksi pada satu waktu pada setiap objek koneksi.

Perilaku isolasi

Perubahan yang belum dipastikan dalam transaksi interaktif hanya terlihat oleh sesi Anda. Sesi lain melihat status tabel seperti sebelum transaksi Anda dimulai.

Nota

Transaksi interaktif menggunakan deteksi konflik yang lebih konservatif daripada transaksi non-interaktif dan dapat bertentangan di tingkat tabel (kecuali penambahan tanpa syarat). Untuk deteksi konflik tingkat baris, gunakan transaksi non-interaktif (BEGIN ATOMIC ... END;).

  1. Untuk memverifikasi isolasi, buat tabel sampel jika tidak ada:
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. Dalam sesi yang sama, mulai transaksi dan buat perubahan:

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. Di tab Editor SQL terpisah atau sesi buku catatan (bukan sel baru di buku catatan yang sama), kueri tabel:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Ini mengembalikan 0 baris karena perubahan yang tidak dilakukan tidak terlihat di luar sesi pertama Anda.

  3. Kembali ke sesi pertama Anda dan terapkan:

    COMMIT;
    
  4. Kueri dari sesi kedua lagi:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Baris terlihat karena transaksi telah dilakukan.

Isolasi ini mencegah pengguna lain membaca data yang mungkin digulung balik.

Pilih mode transaksi

Skenario Mode yang disarankan
Pekerjaan ETL terjadwal Non-interaktif—komit otomatis atau rollback menyederhanakan penanganan kesalahan
Urutan pernyataan tetap Non-interaktif—sintaks yang lebih sederhana, tidak diperlukan penerapan manual
Validasi data sebelum penerapan Interaktif—periksa hasil dan putuskan apakah akan berkomitmen
Aplikasi JDBC membutuhkan kontrol manual Interaktif—pola transaksi database standar

Langkah berikutnya