Tutorial: Mengoordinasikan transaksi di seluruh tabel

Penting

Transaksi yang menulis ke tabel Delta terkelola Katalog Unity ada di Pratinjau Umum.

Transaksi yang menulis ke tabel Iceberg yang dikelola oleh Unity Catalog ada di Pratinjau Privat. Untuk bergabung dengan pratinjau ini, kirim formulir pendaftaran pratinjau tabel Iceberg terkelola.

Tutorial ini menunjukkan cara menggunakan transaksi untuk mengoordinasikan pembaruan di beberapa pernyataan dan tabel. Anda mempelajari kedua mode transaksi: transaksi non-interaktif, yang melakukan transaksi secara otomatis, dan interaktif, yang memberi Anda kontrol eksplisit. Tutorial ini juga menunjukkan penggunaan transaksi dengan prosedur tersimpan dan SQL Scripting untuk membangun beban kerja pergudangan misi penting pada Azure Databricks.

Persyaratan

  • Environment: Akses ke ruang kerja Azure Databricks.
  • Komputasi: Jenis komputasi yang didukung bervariasi menurut mode transaksi:
  • Hak istimewa: CREATE TABLE dalam skema Katalog Unity .

Menyiapkan tabel sampel

Semua tabel yang digunakan dalam multi-pernyataan, transaksi dengan banyak tabel harus memenuhi syarat:

  • Jadilah tabel terkelola Unity Catalog (Delta atau Iceberg)
  • Telah diaktifkan komit Katalog

Buat dua tabel sampel di Editor SQL atau buku catatan:

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Verifikasi penyiapan:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Hasil:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Transaksi non-interaktif

Transaksi non-interaktif menggunakan BEGIN ATOMIC ... END; sintaksis. Semua pernyataan berjalan sebagai unit atom tunggal. Jika setiap pernyataan berhasil, Azure Databricks melakukan commit secara otomatis. Jika ada pernyataan yang gagal, Azure Databricks mengembalikan semua perubahan secara otomatis. Untuk sintaks terperinci dan pola penggunaan, lihat transaksi non-interaktif.

Menjalankan transaksi yang berhasil

Perbarui kedua tabel secara atomik:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Pastikan kedua operasi telah berhasil:

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

Pembaruan saldo dan catatan transaksi dibuat bersama-sama. Jika salah satu pernyataan gagal, perubahan tidak akan dilakukan, dan Databricks akan mengakhiri transaksi tanpa efek samping.

Menggunakan SIGNAL untuk menggagalkan transaksi berdasarkan kondisi

Anda dapat menggunakan SIGNAL di dalam BEGIN ATOMIC ... END; blok untuk gagal dalam transaksi saat kondisi yang ditentukan pengguna tidak terpenuhi. Ini berguna untuk validasi data sebelum melakukan:

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

SIGNAL memunculkan kesalahan, yang menyebabkan seluruh transaksi di-rollback secara otomatis. Verifikasi bahwa sisipan digulung balik:

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

Lihat pemulihan otomatis saat gagal

Jalankan transaksi dengan pernyataan yang tidak valid:

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

Transaksi gagal dengan kesalahan. Verifikasi bahwa pernyataan pertama digulung balik:

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

Meskipun pernyataan pertama INSERT valid, itu digulung balik karena pernyataan kedua gagal. Ini menunjukkan jaminan keberhasilan atau kegagalan total dari transaksi.

Transaksi interaktif

Transaksi interaktif memberi Anda kontrol eksplisit untuk kapan mengonfirmasi atau menggulung balik. Gunakan BEGIN TRANSACTION untuk memulai, lalu COMMIT untuk menyimpan perubahan atau ROLLBACK untuk membuangnya.

Menerapkan perubahan

Mulai transaksi:

BEGIN TRANSACTION;

Buat perubahan (belum dilakukan):

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Terapkan untuk membuat perubahan permanen:

COMMIT;

Verifikasi perubahan:

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

Batalkan perubahan

Mulai transaksi baru:

BEGIN TRANSACTION;

Buat perubahan:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Pastikan perubahan terlihat di sesi Anda:

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

Gulung balik untuk membuang perubahan:

ROLLBACK;

Pastikan perubahan telah dibuang:

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

Gunakan dengan prosedur tersimpan dan Pembuatan Skrip SQL

Anda dapat menggabungkan transaksi dengan prosedur tersimpan untuk membuat logika transaksi yang dapat digunakan kembali. Pola ini berguna untuk operasi kompleks yang sering Anda jalankan.

  1. Buat tabel dengan pengaktifan komit katalog

    CREATE SCHEMA IF NOT EXISTS main.retail;
    
    CREATE TABLE IF NOT EXISTS main.retail.orders (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.orders_staging (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2),
      batch_id STRING
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.total_sales (
      customer_id STRING,
      total_amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
  2. Definisikan prosedur tersimpan

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. Tentukan transaksi

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Jika ada bagian dari transaksi yang gagal, Databricks akan mengembalikan semua perubahan secara otomatis.

Bersihkan

Hapus tabel sampel:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;

DROP TABLE IF EXISTS main.retail.orders;
DROP TABLE IF EXISTS main.retail.orders_staging;
DROP TABLE IF EXISTS main.retail.total_sales;

Langkah berikutnya