Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Это важно
Транзакции, записываемые в управляемые таблицы Delta каталога Unity, находятся в общедоступной предварительной версии.
Транзакции, записываемые в управляемые таблицы Iceberg каталога Unity, находятся в закрытой предварительной версии. Чтобы присоединиться к этой предварительной версии, отправьте форму регистрации на предварительный просмотр управляемых таблиц Iceberg.
Транзакции поддерживают два режима: неинтерактивные и интерактивные. Эта страница охватывает использование каждого режима и включает примеры реализации.
Сведения о требованиях и обзоре транзакций см. в разделе "Транзакции". Практические рекомендации с обоими режимами см. в руководстве по координации транзакций между таблицами.
Замечание
Все таблицы, записанные в многооперационную, многотабличную транзакцию, должны:
- Управляемые таблицы каталога Unity (Delta или Iceberg)
- Включите фиксации каталога
Неинтерактивные транзакции
Неинтерактивные транзакции используют скрипты SQL с ключевым словом ATOMIC . Блок составных инструкций ATOMIC выполняет все инструкции как единую атомарную единицу. Все достигают успеха вместе или терпят неудачу вместе.
Поддерживаемые вычислительные ресурсы: любое хранилище SQL, бессерверные вычисления или кластер под управлением Databricks Runtime 18.0 и более поздних версий.
Поддерживаемый синтаксис: SQL, блоки Scala spark.sql и блоки PySpark spark.sql.
Замечание
Вы можете использовать неинтерактивные транзакции в структурированной потоковой передаче forEachBatch , вызвав вызов spark.sql("BEGIN ATOMIC ... END;"). Однако структурированные контрольные точки стриминга не продвигаются транзакционно.
Синтаксис
BEGIN ATOMIC
statement1;
statement2;
statement3;
END;
Azure Databricks автоматически фиксирует все изменения при успешном завершении всех инструкций. Если любая инструкция завершается ошибкой, Azure Databricks автоматически откатывает все изменения.
Использование в редакторе SQL
Выполнение неинтерактивных транзакций непосредственно в редакторе SQL. Выберите весь блок составных инструкций ATOMIC и запустите его в виде одной инструкции:
BEGIN ATOMIC
DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;
INSERT INTO staging_sales
SELECT * FROM raw_sales WHERE load_date = current_date();
MERGE INTO sales AS target
USING staging_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
Использование в записных книжках
Выполнение неинтерактивных транзакций в записных книжках с помощью ячеек SQL или программных API.
SQL
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
Python
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Scala
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Использование в запланированных заданиях
Неинтерактивные транзакции хорошо работают в запланированных заданиях, так как они автоматически обрабатывают фиксацию и откат:
BEGIN ATOMIC
-- Clear previous staging data
DELETE FROM staging_daily_sales WHERE load_date = current_date();
-- Load new data
INSERT INTO staging_daily_sales
SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
FROM raw_sales
WHERE sale_date = current_date() - INTERVAL 1 DAY;
-- Validate row count (fails transaction if no data)
IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
END IF;
-- Merge into production
MERGE INTO daily_sales AS target
USING staging_daily_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
Если любая инструкция завершается ошибкой, включая утверждение, вся транзакция откатывается автоматически.
Использование с JDBC
Внешние клиенты могут выполнять неинтерактивные транзакции.
JDBC
String sql = """
BEGIN ATOMIC
INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
END;
""";
Statement stmt = conn.createStatement();
stmt.execute(sql);
Использование с API выполнения инструкций
Выполнение неинтерактивных транзакций с помощью API выполнения инструкций:
import requests
sql = """
BEGIN ATOMIC
INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""
response = requests.post(
f"{workspace_url}/api/2.0/sql/statements",
headers={"Authorization": f"Bearer {token}"},
json={
"warehouse_id": warehouse_id,
"statement": sql,
"wait_timeout": "30s"
}
)
Шаблоны ETL
В следующих шаблонах показаны распространенные рабочие процессы ETL с помощью неинтерактивных транзакций.
Шаблон инстанцирования и валидации
Этот шаблон загружает данные в промежуточную область, проверяет качество данных и объединяет проверенные записи в рабочие таблицы:
BEGIN ATOMIC
-- Load into staging
INSERT INTO staging_customers
SELECT * FROM external_source
WHERE ingest_date = current_date();
-- Validate data quality
IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
END IF;
-- Merge validated data
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Update metadata
UPDATE etl_metadata
SET last_load_date = current_date(),
rows_processed = (SELECT COUNT(*) FROM staging_customers)
WHERE table_name = 'customers';
END;
Шаблон таблицы измерений и фактов
Этот шаблон обновляет таблицы измерений перед загрузкой таблиц фактов для обеспечения целостности ссылочных данных:
BEGIN ATOMIC
-- Update dimension tables first
MERGE INTO dim_products AS target
USING staging_products AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO dim_customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Then load fact table with foreign key references
INSERT INTO fact_sales
SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
FROM staging_sales s
JOIN dim_products p ON s.product_id = p.product_id
JOIN dim_customers c ON s.customer_id = c.customer_id;
END;
Обработка ошибок
Если в блоке BEGIN ATOMIC ... END; возникает ошибка, Azure Databricks откатывает все изменения и возвращает сообщение об ошибке.
Советы по отладке:
- Просмотрите сообщение об ошибке, чтобы определить, какое утверждение вызвало ошибку.
- Тестируйте инструкции отдельно вне блока транзакций.
- Добавьте проверки, используя
SIGNAL, чтобы выдать пользовательские сообщения об ошибках. - Запросите историю транзакций для дополнительного контекста.
Интерактивные транзакции
Интерактивные транзакции предоставляют явный контроль над границами транзакций. Вы вручную начинаете транзакцию, выполняете инструкции и явно фиксируете или откатываете.
Поддерживаемые вычислительные ресурсы: только хранилища SQL .
Поддерживаемый синтаксис: только SQL.
Синтаксис
BEGIN TRANSACTION;
statement1;
statement2;
COMMIT;
-- or: ROLLBACK;
Проверка перед коммитом
Используйте интерактивные транзакции для проверки результатов перед подтверждением.
BEGIN TRANSACTION;
-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();
-- Validate and commit or rollback
BEGIN
DECLARE duplicate_count INT;
SET duplicate_count = (
SELECT COUNT(*) FROM (
SELECT customer_id, COUNT(*) as cnt
FROM staging_customers
WHERE load_date = current_date()
GROUP BY customer_id
HAVING COUNT(*) > 1
)
);
IF duplicate_count > 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
ELSE
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
COMMIT;
END IF;
END;
Явная отмена изменений
Откатить транзакцию в случае сбоя проверки или когда бизнес-логика требует отмены изменений.
BEGIN TRANSACTION;
UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;
-- Check if quantity would go negative
BEGIN
DECLARE new_quantity INT;
SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);
IF new_quantity < 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
ELSE
COMMIT;
END IF;
END;
Использование с JDBC
Драйвер JDBC поддерживает выполнение инструкций DML с использованием executeUpdate() в транзакциях. Список поддерживаемых инструкций DML см. в разделе "Поддерживаемые операции".
Клиенты JDBC используют интерактивные транзакции, отключая режим автоматической фиксации:
Connection conn = DriverManager.getConnection(jdbcUrl, properties);
try {
conn.setAutoCommit(false); // Start transaction mode
Statement stmt = conn.createStatement();
stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");
conn.commit(); // Commit the transaction
} catch (SQLException e) {
conn.rollback(); // Roll back on error
throw e;
} finally {
conn.close();
}
Неподдерживаемые операции JDBC
Следующие операции JDBC не поддерживаются в интерактивных транзакциях:
| Категория | Не поддерживаются |
|---|---|
| Переключение каталога или схемы |
Connection.setCatalog() и Connection.setSchema(). |
| Изменения конфигурации сеанса |
Connection.setClientInfo() для свойств уровня сеанса, таких как TIMEZONE и ANSI_MODE |
| All DatabaseMetaData (все протоколы) | Все DatabaseMetaData.* методы |
| Метаданные PreparedStatement | PreparedStatement.getMetaData() |
| Хранимые процедуры | CALL procedure_name() |
Использование с ODBC
Драйвер ODBC поддерживает выполнение инструкций DML с помощью SQLExecute() и SQLExecDirect() внутри транзакций. Список поддерживаемых инструкций DML см. в разделе "Поддерживаемые операции".
Клиенты ODBC могут использовать интерактивные транзакции с драйвером ODBC Azure Databricks с помощью стандартных функций управления транзакциями ODBC.
Замечание
AutoCommit необходимо отключить для использования транзакций.
UseNativeQuery необходимо установить в 1 для отключения AutoCommit в процессе выполнения.
Неподдерживаемые операции ODBC
Следующие операции ODBC не поддерживаются в интерактивных транзакциях:
| Категория | Не поддерживаются |
|---|---|
| Все функции каталога |
SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns |
| Настройка атрибутов подключения | Переключение каталога, изменение уровня изоляции и изменение режима доступа с помощью SQLSetConnectAttr() |
| Перевод SQL | SQLNativeSql |
Используйте с Databricks SQL Connector для Python
Соединитель SQL Databricks для Python поддерживает выполнение инструкций DML с помощью cursor.execute() в транзакциях. Список поддерживаемых инструкций DML см. в разделе "Поддерживаемые операции".
Приложения на Python могут использовать интерактивные транзакции с коннектором Databricks SQL для Python путем установки autocommit=False:
from databricks import sql
with sql.connect(
server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
http_path="sql/1.0/warehouses/abc123def456",
access_token="your-access-token",
autocommit=False
) as connection:
with connection.cursor() as cursor:
cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
connection.commit()
Операции соединителя Python не поддерживаются
Следующие операции соединителя Python не поддерживаются в интерактивных транзакциях:
| Категория | Не поддерживаются |
|---|---|
| Все метаданные |
cursor.catalogs(), cursor.schemas()cursor.tables()cursor.columns() |
Ограничения драйверов для интерактивных транзакций
Следующие ограничения применяются ко всем драйверам при использовании интерактивных транзакций.
Операции метаданных не поддерживаются внутри интерактивных транзакций. Следующие операции могут завершиться ошибкой в транзакции независимо от драйвера или протокола:
| Драйвер или протокол | Тип | Методы |
|---|---|---|
| JDBC | DatabaseMetaData |
getCatalogs(), getSchemas(), , getTables(), getColumns()getTypeInfo() |
| ODBC | Функции каталога |
SQLTables, SQLColumns, SQLGetTypeInfo |
| соединитель Python | Методы метаданных |
cursor.catalogs(), cursor.schemas()cursor.tables()cursor.columns() |
| SQL | Команды метаданных |
SHOW TABLES, SHOW DATABASES, , DESCRIBE TABLE, USE CATALOGUSE SCHEMA |
| SQL | information_schema |
SELECTзапросы к таблицам information_schema |
Выполнение всех операций метаданных за пределами транзакций.
Предупреждение
Выполнение транзакций в нескольких потоках в одном объекте подключения драйвера приводит к неопределенному поведению. Выполняйте только одну транзакцию за раз в каждом объекте подключения.
Поведение изоляции
Незафиксированные изменения в интерактивной транзакции видны только в вашем сеансе. Другие сеансы видят прежнее состояние таблицы до начала вашей транзакции.
Замечание
Интерактивные транзакции используют более консервативное обнаружение конфликтов, чем неинтерактивные транзакции, и могут конфликтовать на уровне таблицы (за исключением безусловных вставок). Для обнаружения конфликтов на уровне строк используйте неинтерактивные транзакции (BEGIN ATOMIC ... END;).
- Чтобы проверить изоляцию, создайте пример таблицы, если она не существует:
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
В этом же сеансе запустите транзакцию и внесите изменения:
BEGIN TRANSACTION; INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);На отдельной вкладке редактора SQL или сеансе записной книжки (не новой ячейке в той же записной книжке) запросите таблицу:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;При этом возвращается 0 строк, так как незафиксированные изменения не отображаются за пределами первого сеанса.
Вернитесь к первому сеансу и зафиксируйте:
COMMIT;Повторите запрос из второго сеанса:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;Строка отображается, так как транзакция зафиксирована.
Эта изоляция предотвращает другим пользователям чтение данных, которые могут быть отменены.
Выбор режима транзакции
| Сценарий | Рекомендуемый режим |
|---|---|
| Запланированные задания ETL | Неинтерактивная— автоматическая фиксация или откат упрощает обработку ошибок |
| Фиксированные последовательности инструкций | Неинтерактивный — более простой синтаксис, не требуется ручной коммит |
| Проверка данных перед коммитом | Интерактивный осмотр результатов и принятие решения о фиксации |
| Приложения JDBC, требующие ручного управления | Интерактивные — стандартные шаблоны транзакций базы данных |
Дальнейшие действия
- Руководство. Координация транзакций между таблицами
- Транзакции
- Каталог коммитов
- Уровни изоляции и конфликты записи
Справочник по SQL
- Составная инструкция ATOMIC (неинтерактивные транзакции) — выполнение нескольких инструкций SQL в виде одной атомарной транзакции с автоматической фиксацией и откатом.
- BEGIN TRANSACTION (интерактивные транзакции): начать интерактивную транзакцию с помощью ручного управления фиксацией и откатом.
- COMMIT: фиксирует интерактивную транзакцию и делает все изменения постоянными.
- ROLLBACK: отмена интерактивной транзакции и всех изменений.