Sdílet prostřednictvím


Kurz: Koordinace transakcí napříč tabulkami

Důležité

Transakce, které se zapisují do spravovaných tabulek Delta katalogu Unity, jsou ve verzi Public Preview.

Transakce, které zapisují do katalogu Unity spravované tabulky Iceberg, jsou ve verzi Private Preview. Pokud se chcete připojit k této verzi Preview, odešlete formulář pro registraci spravovaných tabulek Iceberg ve verzi Preview.

Tento kurz ukazuje, jak pomocí transakcí koordinovat aktualizace napříč více příkazy a tabulkami. Naučíte se oba režimy transakcí: neinteraktivní transakce, které se potvrdí automaticky, a interaktivní transakce, které vám dávají explicitní kontrolu. Tento kurz také ukazuje použití transakcí s uloženými procedurami a skriptováním SQL k sestavení klíčových úloh skladových zásob v Azure Databricks.

Požadavky

  • Prostředí: Přístup k pracovnímu prostoru Azure Databricks
  • Výpočty: Podporované typy výpočetních prostředků se liší podle režimu transakcí:
  • Oprávnění: CREATE TABLE ve schématu Unity Catalog.

Nastavení ukázkových tabulek

Všechny tabulky, které jsou zapsány ve více příkazech v rámci transakce s více tabulkami, musí:

V editoru SQL nebo poznámkovém bloku vytvořte dvě ukázkové tabulky:

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Ověřte nastavení:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Výstup:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Neinteraktivní transakce

Neinteraktivní transakce používají BEGIN ATOMIC ... END; syntaxi. Všechny příkazy běží jako jedna atomická jednotka. Pokud je každý příkaz úspěšný, proběhne automatické potvrzení Azure Databricks. Pokud některý z příkazů selže, Azure Databricks vrátí všechny změny automaticky. Podrobné vzorce syntaxe a použití najdete v neinteraktivních transakcích.

Spuštění úspěšné transakce

Aktualizujte atomickým způsobem obě tabulky:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Ověřte, že obě operace uspěly.

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

Aktualizace zůstatku i záznam transakce byly vytvořeny společně. Pokud by některý z příkazů selhal, nebyla by potvrzena žádná změna a Databricks by ukončil transakci bez vedlejších účinků.

Použijte SIGNAL k selhání transakce na základě podmínky

Uvnitř bloku BEGIN ATOMIC ... END; můžete použít SIGNAL k selhání transakce, pokud není splněna podmínka definovaná uživatelem. To je užitečné pro ověření dat před potvrzením:

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

SIGNAL vyvolá chybu, která způsobí, že je celá transakce automaticky zrušena. Ověřte, že se vložení vrátilo zpět:

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

Zobrazení automatického vrácení zpět při selhání

Spusťte transakci s neplatným příkazem:

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

Transakce selže s chybou. Ověřte, že první příkaz byl vrácen zpět:

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

I když byl první INSERT příkaz platný, vrátil se zpět, protože druhý příkaz selhal. To ukazuje záruku transakcí typu vše nebo nic.

Interaktivní transakce

Interaktivní transakce poskytují explicitní kontrolu nad tím, kdy se mají potvrdit nebo vrátit zpět. Pomocí BEGIN TRANSACTION spuštěte transakci, poté použijte COMMIT k uložení změn nebo ROLLBACK k jejich zrušení.

Potvrzení změn

Zahájení transakce:

BEGIN TRANSACTION;

Proveďte změny (dosud nepotvrzené):

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Potvrďte, že provedete trvalé změny:

COMMIT;

Ověřte změny:

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

Vrátit změny

Spusťte novou transakci:

BEGIN TRANSACTION;

Proveďte změnu:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Ověřte, zda je změna viditelná ve vaší relaci:

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

Vraťte se zpět a zahoďte změnu:

ROLLBACK;

Ujistěte se, že byla změna zrušena.

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

Použití s uloženými procedurami a skriptováním SQL

Transakce můžete kombinovat s uloženými procedurami a vytvořit opakovaně použitelnou logiku transakce. Tento model je užitečný pro složité operace, které spouštíte často.

  1. Vytvořte dvě tabulky s povolenými potvrzeními spravovanými katalogem

    
    CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    
  2. Definování uložené procedury

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. Definování transakce

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Pokud jakákoli část transakce selže, Databricks vrátí zpět všechny změny automaticky.

Uklidit

Odeberte ukázkové tabulky:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;


DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;

Další kroky