Oktatóanyag: Tranzakciók koordinálása táblák között

Fontos

A Unity Catalog által felügyelt Delta-táblákba írt tranzakciók nyilvános előzetes verzióban érhetők el.

A Unity Catalog által felügyelt Iceberg-táblákba írt tranzakciók privát előzetes verzióban érhetők el. Az előzetes verzióhoz való csatlakozáshoz küldje el a felügyelt Iceberg-táblák előzetes regisztrációs űrlapot.

Ez az oktatóanyag bemutatja, hogyan lehet tranzakciók használatával összehangolni a frissítéseket több utasítás és tábla között. Mindkét tranzakciós módot elsajátíthatja: az automatikusan véglegesítést biztosító nem interaktív tranzakciókat és az interaktív tranzakciókat, amelyek explicit vezérlést biztosítanak. Az oktatóanyag azt is bemutatja, hogy a tranzakciók tárolt eljárásokkal és SQL Scriptinggel történő használatával kritikus fontosságú adattárházi számítási feladatokat hozhat létre Azure Databricks.

Követelmények

  • Környezet: Hozzáférés Azure Databricks munkaterülethez.
  • Számítás: A támogatott számítási típusok tranzakciós mód szerint változnak:
    • A klasszikus vagy kiszolgáló nélküli SQL Warehouse mindkét tranzakciós módot támogatja.
    • A kiszolgáló nélküli számítás csak a nem interaktív tranzakciókat támogatja.
    • A Databricks Runtime 18.0 vagy annál újabb verzióját futtató klasszikus klaszterek csak a nem interaktív tranzakciókat támogatják.
  • Jogosultságok: CREATE TABLEUnity Catalog sémájában.

Mintatáblák beállítása

A többutas, többtáblás tranzakcióban írt összes táblának a következőnek kell lennie:

Hozzon létre két mintatáblát az SQL-szerkesztőben vagy egy jegyzetfüzetben:

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Ellenőrizze a telepítést:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

kimenet:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Nem interaktív tranzakciók

A nem interaktív tranzakciók szintaxist használnak BEGIN ATOMIC ... END; . Minden utasítás egyetlen atomegységként fut. Ha minden utasítás sikeres, Azure Databricks automatikusan véglegesíti. Ha bármelyik utasítás meghiúsul, Azure Databricks automatikusan visszaállítja az összes módosítást. A szintaxis és a használati minták részletes leírása: nem interaktív tranzakciók.

Sikeres tranzakció futtatása

Frissítse mindkét táblát atomilag:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Ellenőrizze, hogy mindkét művelet sikeres volt-e:

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

Az egyenlegfrissítés és a tranzakciórekord együtt lett létrehozva. Ha bármelyik utasítás meghiúsult volna, egyik módosítás sem valósult volna meg, és a Databricks a tranzakciót mellékhatások nélkül leállította volna.

A SIGNAL használatával meghiusítható egy tranzakció, ha egy feltétel teljesül.

A tranzakció meghiúsulhat, ha a felhasználó által megadott feltétel nem teljesül, és egy SIGNAL-et használ egy BEGIN ATOMIC ... END; blokkon belül. Ez a véglegesítés előtt hasznos adatérvényesítéshez:

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

Ez SIGNAL hibát jelez, amely miatt a teljes tranzakció automatikusan visszagördül. Ellenőrizze, hogy a beszúrás vissza lett-e állítva:

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

Tekintse meg az automatikus visszaállítás sikertelenségét

Tranzakció futtatása érvénytelen utasítással:

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

A tranzakció hiba miatt meghiúsul. Ellenőrizze, hogy az első utasítás vissza lett-e állítva:

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

Annak ellenére, hogy az első INSERT utasítás érvényes volt, vissza lett állítva, mert a második utasítás nem sikerült. Ez a tranzakciók teljes vagy egyáltalán nem garantált garanciáit mutatja be.

Interaktív tranzakciók

Az interaktív tranzakciók explicit módon szabályozják, hogy mikor kell véglegesíteni vagy visszadobni. A TRANZAKCIÓ INDÍTÁSA parancsot használva kezdje el, majd a COMMIT parancsot a módosítások mentéséhez vagy a ROLLBACK parancsot az elvetésükhöz.

Módosítások véglegesítése

Tranzakció indítása:

BEGIN TRANSACTION;

Módosítások végrehajtása (még nem véglegesített):

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

A módosítások véglegesítése.

COMMIT;

Ellenőrizze a módosításokat:

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

Módosítások visszaállítása

Új tranzakció indítása:

BEGIN TRANSACTION;

Módosítsa a következőt:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Ellenőrizze, hogy a módosítás látható-e a munkamenetben:

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

Visszaállítás a módosítás elvetéséhez:

ROLLBACK;

Ellenőrizze, hogy a módosítás elvetve lett-e:

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

Használat tárolt eljárásokkal és SQL-szkriptekkel

A tranzakciókat tárolt eljárásokkal kombinálva újrahasználható tranzakciólogikát hozhat létre. Ez a minta a gyakran futtatott összetett műveletekhez hasznos.

  1. A katalógus véglegesítéseit engedélyező táblák létrehozása

    CREATE SCHEMA IF NOT EXISTS main.retail;
    
    CREATE TABLE IF NOT EXISTS main.retail.orders (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.orders_staging (
      order_id STRING,
      customer_id STRING,
      amount DECIMAL(18,2),
      batch_id STRING
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE IF NOT EXISTS main.retail.total_sales (
      customer_id STRING,
      total_amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
  2. A tárolt eljárás meghatározása

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. A tranzakció meghatározása

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Ha a tranzakció bármelyik része meghiúsul, a Databricks automatikusan visszaállítja az összes módosítást.

Tisztítás

Távolítsa el a mintatáblákat:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;

DROP TABLE IF EXISTS main.retail.orders;
DROP TABLE IF EXISTS main.retail.orders_staging;
DROP TABLE IF EXISTS main.retail.total_sales;

Következő lépések