Режимы транзакций

Это важно

Транзакции, записываемые в управляемые таблицы Delta каталога Unity, находятся в общедоступной предварительной версии.

Транзакции, записываемые в управляемые таблицы Iceberg каталога Unity, находятся в закрытой предварительной версии. Чтобы присоединиться к этой предварительной версии, отправьте форму регистрации на предварительный просмотр управляемых таблиц Iceberg.

Транзакции поддерживают два режима: неинтерактивные и интерактивные. Эта страница охватывает использование каждого режима и включает примеры реализации.

Сведения о требованиях и обзоре транзакций см. в разделе "Транзакции". Практические рекомендации с обоими режимами см. в руководстве по координации транзакций между таблицами.

Замечание

Все таблицы, записанные в многооперационную, многотабличную транзакцию, должны:

Неинтерактивные транзакции

Неинтерактивные транзакции используют скрипты SQL с ключевым словом ATOMIC . Блок составных инструкций ATOMIC выполняет все инструкции как единую атомарную единицу. Все достигают успеха вместе или терпят неудачу вместе.

Поддерживаемые вычислительные ресурсы: любое хранилище SQL, бессерверные вычисления или кластер под управлением Databricks Runtime 18.0 и более поздних версий.

Поддерживаемый синтаксис: SQL, блоки Scala spark.sql и блоки PySpark spark.sql.

Замечание

Вы можете использовать неинтерактивные транзакции в структурированной потоковой передаче forEachBatch , вызвав вызов spark.sql("BEGIN ATOMIC ... END;"). Однако структурированные контрольные точки стриминга не продвигаются транзакционно.

Синтаксис

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

Azure Databricks автоматически фиксирует все изменения при успешном завершении всех инструкций. Если любая инструкция завершается ошибкой, Azure Databricks автоматически откатывает все изменения.

Использование в редакторе SQL

Выполнение неинтерактивных транзакций непосредственно в редакторе SQL. Выберите весь блок составных инструкций ATOMIC и запустите его в виде одной инструкции:

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Использование в записных книжках

Выполнение неинтерактивных транзакций в записных книжках с помощью ячеек SQL или программных API.

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Использование в запланированных заданиях

Неинтерактивные транзакции хорошо работают в запланированных заданиях, так как они автоматически обрабатывают фиксацию и откат:

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Если любая инструкция завершается ошибкой, включая утверждение, вся транзакция откатывается автоматически.

Использование с JDBC

Внешние клиенты могут выполнять неинтерактивные транзакции.

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

Использование с API выполнения инструкций

Выполнение неинтерактивных транзакций с помощью API выполнения инструкций:

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

Шаблоны ETL

В следующих шаблонах показаны распространенные рабочие процессы ETL с помощью неинтерактивных транзакций.

Шаблон инстанцирования и валидации

Этот шаблон загружает данные в промежуточную область, проверяет качество данных и объединяет проверенные записи в рабочие таблицы:

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

Шаблон таблицы измерений и фактов

Этот шаблон обновляет таблицы измерений перед загрузкой таблиц фактов для обеспечения целостности ссылочных данных:

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

Обработка ошибок

Если в блоке BEGIN ATOMIC ... END; возникает ошибка, Azure Databricks откатывает все изменения и возвращает сообщение об ошибке.

Советы по отладке:

  1. Просмотрите сообщение об ошибке, чтобы определить, какое утверждение вызвало ошибку.
  2. Тестируйте инструкции отдельно вне блока транзакций.
  3. Добавьте проверки, используя SIGNAL, чтобы выдать пользовательские сообщения об ошибках.
  4. Запросите историю транзакций для дополнительного контекста.

Интерактивные транзакции

Интерактивные транзакции предоставляют явный контроль над границами транзакций. Вы вручную начинаете транзакцию, выполняете инструкции и явно фиксируете или откатываете.

Поддерживаемые вычислительные ресурсы: только хранилища SQL .

Поддерживаемый синтаксис: только SQL.

Синтаксис

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

Проверка перед коммитом

Используйте интерактивные транзакции для проверки результатов перед подтверждением.

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

Явная отмена изменений

Откатить транзакцию в случае сбоя проверки или когда бизнес-логика требует отмены изменений.

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

Использование с JDBC

Драйвер JDBC поддерживает выполнение инструкций DML с использованием executeUpdate() в транзакциях. Список поддерживаемых инструкций DML см. в разделе "Поддерживаемые операции".

Клиенты JDBC используют интерактивные транзакции, отключая режим автоматической фиксации:

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

Неподдерживаемые операции JDBC

Следующие операции JDBC не поддерживаются в интерактивных транзакциях:

Категория Не поддерживаются
Переключение каталога или схемы Connection.setCatalog() и Connection.setSchema().
Изменения конфигурации сеанса Connection.setClientInfo() для свойств уровня сеанса, таких как TIMEZONE и ANSI_MODE
All DatabaseMetaData (все протоколы) Все DatabaseMetaData.* методы
Метаданные PreparedStatement PreparedStatement.getMetaData()
Хранимые процедуры CALL procedure_name()

Использование с ODBC

Драйвер ODBC поддерживает выполнение инструкций DML с помощью SQLExecute() и SQLExecDirect() внутри транзакций. Список поддерживаемых инструкций DML см. в разделе "Поддерживаемые операции".

Клиенты ODBC могут использовать интерактивные транзакции с драйвером ODBC Azure Databricks с помощью стандартных функций управления транзакциями ODBC.

Замечание

AutoCommit необходимо отключить для использования транзакций. UseNativeQuery необходимо установить в 1 для отключения AutoCommit в процессе выполнения.

Неподдерживаемые операции ODBC

Следующие операции ODBC не поддерживаются в интерактивных транзакциях:

Категория Не поддерживаются
Все функции каталога SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns
Настройка атрибутов подключения Переключение каталога, изменение уровня изоляции и изменение режима доступа с помощью SQLSetConnectAttr()
Перевод SQL SQLNativeSql

Используйте с Databricks SQL Connector для Python

Соединитель SQL Databricks для Python поддерживает выполнение инструкций DML с помощью cursor.execute() в транзакциях. Список поддерживаемых инструкций DML см. в разделе "Поддерживаемые операции".

Приложения на Python могут использовать интерактивные транзакции с коннектором Databricks SQL для Python путем установки autocommit=False:

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

Операции соединителя Python не поддерживаются

Следующие операции соединителя Python не поддерживаются в интерактивных транзакциях:

Категория Не поддерживаются
Все метаданные cursor.catalogs(), cursor.schemas()cursor.tables()cursor.columns()

Ограничения драйверов для интерактивных транзакций

Следующие ограничения применяются ко всем драйверам при использовании интерактивных транзакций.

Операции метаданных не поддерживаются внутри интерактивных транзакций. Следующие операции могут завершиться ошибкой в транзакции независимо от драйвера или протокола:

Драйвер или протокол Тип Методы
JDBC DatabaseMetaData getCatalogs(), getSchemas(), , getTables(), getColumns()getTypeInfo()
ODBC Функции каталога SQLTables, SQLColumns, SQLGetTypeInfo
соединитель Python Методы метаданных cursor.catalogs(), cursor.schemas()cursor.tables()cursor.columns()
SQL Команды метаданных SHOW TABLES, SHOW DATABASES, , DESCRIBE TABLE, USE CATALOGUSE SCHEMA
SQL information_schema SELECTзапросы к таблицам information_schema

Выполнение всех операций метаданных за пределами транзакций.

Предупреждение

Выполнение транзакций в нескольких потоках в одном объекте подключения драйвера приводит к неопределенному поведению. Выполняйте только одну транзакцию за раз в каждом объекте подключения.

Поведение изоляции

Незафиксированные изменения в интерактивной транзакции видны только в вашем сеансе. Другие сеансы видят прежнее состояние таблицы до начала вашей транзакции.

Замечание

Интерактивные транзакции используют более консервативное обнаружение конфликтов, чем неинтерактивные транзакции, и могут конфликтовать на уровне таблицы (за исключением безусловных вставок). Для обнаружения конфликтов на уровне строк используйте неинтерактивные транзакции (BEGIN ATOMIC ... END;).

  1. Чтобы проверить изоляцию, создайте пример таблицы, если она не существует:
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. В этом же сеансе запустите транзакцию и внесите изменения:

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. На отдельной вкладке редактора SQL или сеансе записной книжки (не новой ячейке в той же записной книжке) запросите таблицу:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    При этом возвращается 0 строк, так как незафиксированные изменения не отображаются за пределами первого сеанса.

  3. Вернитесь к первому сеансу и зафиксируйте:

    COMMIT;
    
  4. Повторите запрос из второго сеанса:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Строка отображается, так как транзакция зафиксирована.

Эта изоляция предотвращает другим пользователям чтение данных, которые могут быть отменены.

Выбор режима транзакции

Сценарий Рекомендуемый режим
Запланированные задания ETL Неинтерактивная— автоматическая фиксация или откат упрощает обработку ошибок
Фиксированные последовательности инструкций Неинтерактивный — более простой синтаксис, не требуется ручной коммит
Проверка данных перед коммитом Интерактивный осмотр результатов и принятие решения о фиксации
Приложения JDBC, требующие ручного управления Интерактивные — стандартные шаблоны транзакций базы данных

Дальнейшие действия