Поделиться через


Руководство. Координация транзакций между таблицами

Это важно

Транзакции, записываемые в управляемые таблицы Delta каталога Unity, находятся в общедоступной предварительной версии.

Транзакции, записываемые в управляемые таблицы Iceberg каталога Unity, находятся в закрытой предварительной версии. Чтобы присоединиться к этой предварительной версии, отправьте форму регистрации на предварительный просмотр управляемых таблиц Iceberg.

В этом руководстве показано, как использовать транзакции для координации обновлений, выполняемых в нескольких инструкциях и таблицах. Вы узнаете оба режима транзакций: неинтерактивные транзакции, которые автоматически фиксируются и интерактивные транзакции, что дает явный контроль. В этом руководстве также показано использование транзакций с хранимыми процедурами и скриптами SQL для создания критически важных рабочих нагрузок хранения в Azure Databricks.

Требования

  • Среда: доступ к рабочей области Azure Databricks.
  • Вычисления: поддерживаемые типы вычислений зависят от режима транзакции:
  • Привилегии: CREATE TABLE в схеме каталога Unity .

Настройка примеров таблиц

Все таблицы, записанные в многооперационную, многотабличную транзакцию, должны:

Создайте две примеры таблиц в редакторе SQL или записной книжке:

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Проверьте настройку:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Выходные данные:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Неинтерактивные транзакции

Неинтерактивные транзакции используют BEGIN ATOMIC ... END; синтаксис. Все операторы выполняются как одна атомарная единица. Если каждая инструкция выполнена успешно, Azure Databricks автоматически подтверждает транзакции. Если любая операция завершается ошибкой, Azure Databricks откатывает все изменения автоматически. Подробные сведения о синтаксисе и шаблонах использования см. в неинтерактивных транзакциях.

Выполнение успешной транзакции

Обновите обе таблицы атомарно:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

Убедитесь, что обе операции выполнены успешно:

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

Обновление баланса и запись транзакций были созданы вместе. Если бы любой запрос завершился сбоем, ни одно из изменений не было бы зафиксировано, и Databricks прервала бы транзакцию без побочных эффектов.

Использование SIGNAL для прерывания транзакции при выполнении условия

Вы можете использовать SIGNAL внутри BEGIN ATOMIC ... END; блока, чтобы завершить транзакцию, если определяемое пользователем условие не выполнено. Это полезно для проверки данных перед применением.

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

SIGNAL вызывает программную ошибку, в результате чего вся транзакция автоматически откатывается. Убедитесь, что вставка была откатена:

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

Просмотрите автоматическое восстановление системы при сбое

Запустите транзакцию с недопустимой инструкцией:

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

Транзакция завершается ошибкой. Убедитесь, что первая инструкция была откатена:

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

Несмотря на то, что первая INSERT инструкция действительна, она была откатена, так как вторая инструкция завершилась ошибкой. Это демонстрирует гарантию выполнения транзакций в полном объеме или их отмены.

Интерактивные транзакции

Интерактивные транзакции предоставляют вам явный контроль над процессом фиксации изменений или их отката. Используйте BEGIN TRANSACTION для запуска, а затем ФИКСАЦИЯ для сохранения изменений или ROLLBACK для их отмены.

Фиксация изменений

Запуск транзакции:

BEGIN TRANSACTION;

Внесите изменения (пока не утверждены):

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Принять чтобы сделать изменения постоянными.

COMMIT;

Проверьте изменения:

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

Откат изменений

Запустите новую транзакцию:

BEGIN TRANSACTION;

Внесите изменения:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Убедитесь, что изменение отображается в сеансе:

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

Откат, чтобы отменить изменение:

ROLLBACK;

Убедитесь, что изменение было отменено:

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

Использование с хранимыми процедурами и скриптами SQL

Транзакции можно объединить с хранимыми процедурами для создания логики повторно используемых транзакций. Этот шаблон полезен для сложных операций, которые часто выполняются.

  1. Создайте две таблицы с каталогом управляемыми коммитами включенными

    
    CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    
  2. Определение хранимой процедуры

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. Определение транзакции

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

Если любая часть транзакции завершается ошибкой, Databricks откатывает все изменения автоматически.

Очистка

Удалите примеры таблиц:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;


DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;

Дальнейшие действия