Compartilhar via


Modos de transação

Importante

As transações que gravam em tabelas Delta gerenciadas pelo Catálogo Unity estão em Visualização Pública.

As transações que gravam em tabelas Iceberg gerenciadas pelo Unity Catalog estão em Versão Prévia Privada. Para ingressar nesta prévia, envie o formulário de inscrição para prévia das tabelas Iceberg gerenciadas.

As transações dão suporte a dois modos: não interativos e interativos. Esta página aborda quando usar cada modo e inclui exemplos de implementação.

Para obter requisitos e uma visão geral das transações, consulte Transações. Para prática prática com ambos os modelos, consulte Tutorial: Coordenar transações entre tabelas.

Observação

Todas as tabelas gravadas em uma transação com várias instruções e várias tabelas devem:

Transações não interativas

Transações não interativas usam script SQL com a ATOMIC palavra-chave. O bloco de declaração composta ATOMIC executa todas as instruções como uma única unidade atômica. Todos são bem-sucedidos juntos ou todos falham juntos.

Computação com suporte: qualquer sql warehouse, computação sem servidor ou cluster executando o Databricks Runtime 18.0 e superior.

Sintaxe com suporte: dá suporte a blocos SQL, Scala spark.sql e PySpark spark.sql .

Observação

Você pode usar transações não interativas no Structured Streaming's forEachBatch chamando spark.sql("BEGIN ATOMIC ... END;"). No entanto, os pontos de verificação do Structured Streaming não avançam de forma transacional.

Sintaxe

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

O Azure Databricks confirma automaticamente todas as alterações se todas as instruções forem bem-sucedidas. Se alguma instrução falhar, o Azure Databricks reverterá automaticamente todas as alterações.

Usar no editor do SQL

Execute transações não interativas diretamente no Editor do SQL. Selecione todo o bloco de instrução composta ATOMIC e execute-o como uma única instrução:

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Utilização em notebooks

Execute transações não interativas em notebooks usando células SQL ou APIs programáticas.

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Usar em trabalhos agendados

As transações não interativas funcionam bem em trabalhos agendados porque lidam automaticamente com confirmação e reversão:

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Se alguma instrução falhar, incluindo a asserção, toda a transação será revertída automaticamente.

Uso com JDBC

Os clientes externos podem executar transações não interativas.

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

Usar com a API de Execução de Comandos

Execute transações não interativas usando a API de Execução de Declaração:

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

Padrões ETL

Os padrões a seguir demonstram fluxos de trabalho de ETL comuns usando transações não interativas.

Padrão de preparo e validação

Esse padrão carrega dados em uma área de preparo, valida a qualidade dos dados e mescla registros validados em tabelas de produção:

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

Padrão de tabela de dimensões e fatos

Esse padrão atualiza tabelas de dimensão antes de carregar tabelas de fatos para manter a integridade referencial:

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

Tratamento de erros

Quando uma instrução falha em um BEGIN ATOMIC ... END; bloco, o Azure Databricks reverte todas as alterações e retorna uma mensagem de erro.

Dicas de depuração:

  1. Examine a mensagem de erro para identificar qual instrução falhou.
  2. Teste as instruções individualmente fora do bloco de transação.
  3. Adicione verificações de validação usando SIGNAL para gerar mensagens de erro personalizadas.
  4. Consultar o histórico de transações para contexto adicional.

Transações interativas

Transações interativas fornecem controle explícito sobre limites de transação. Você inicia manualmente uma transação, executa instruções e confirma ou reverte explicitamente.

Recursos computacionais suportados: Somente SQL Warehouses.

Sintaxe com suporte: somente SQL.

Sintaxe

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

Validar antes de confirmar

Use transações interativas para validar os resultados antes de confirmar:

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

Reversão explícita

Reverta uma transação quando a validação falhar ou a lógica de negócios exigir o descarte de alterações:

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

Uso com JDBC

O driver JDBC dá suporte à execução de instruções DML usando executeUpdate() em transações. Para obter uma lista de instruções DML com suporte, consulte operações com suporte.

Os clientes JDBC usam transações interativas desabilitando o modo de confirmação automática:

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

Operações JDBC sem suporte

As seguintes operações JDBC não têm suporte em transações interativas:

Categoria Sem suporte
Alternância de catálogo ou esquema Connection.setCatalog() e Connection.setSchema()
Alterações na configuração da sessão Connection.setClientInfo() para propriedades de nível de sessão, como TIMEZONE e ANSI_MODE
Todos os DatabaseMetaData (todos os protocolos) Todos os DatabaseMetaData.* métodos
Metadados do PreparedStatement PreparedStatement.getMetaData()
Procedimentos armazenados CALL procedure_name()

Usar com ODBC

O driver ODBC dá suporte à execução de instruções DML usando SQLExecute() e SQLExecDirect() dentro de transações. Para obter uma lista de instruções DML com suporte, consulte operações com suporte.

Os clientes ODBC podem usar transações interativas com o driver ODBC do Azure Databricks usando funções de gerenciamento de transações ODBC padrão.

Operações ODBC sem suporte

As seguintes operações ODBC não têm suporte em transações interativas:

Categoria Sem suporte
Todas as funções de catálogo SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, , SQLForeignKeys, SQLTablePrivileges, , SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns
Definindo atributos de conexão Mudança de catálogo, alterações no nível de isolamento e alterações no modo de acesso usando SQLSetConnectAttr()
Tradução de SQL SQLNativeSql

Uso com o Conector SQL do Databricks para Python

O Conector SQL do Databricks para Python suporta a execução de instruções DML usando cursor.execute() dentro de transações. Para obter uma lista de instruções DML com suporte, consulte operações com suporte.

Os aplicativos Python podem usar transações interativas com o Conector do SQL do Databricks para Python definindo autocommit=False:

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

Operações não suportadas pelo conector Python

As seguintes operações de conector python não têm suporte em transações interativas:

Categoria Sem suporte
Todos os metadados cursor.catalogs(), cursor.schemas(), , cursor.tables()cursor.columns()

Limitações de driver para transações interativas

As limitações a seguir se aplicam a todos os drivers ao usar transações interativas.

Não há suporte para operações de metadados dentro de transações interativas. As seguintes operações podem falhar dentro de uma transação, independentemente do driver ou protocolo:

Driver/Protocolo Tipo Methods
JDBC DatabaseMetaData getCatalogs(), getSchemas(), getTables(), , getColumns()getTypeInfo()
ODBC Funções de catálogo SQLTables, SQLColumns, SQLGetTypeInfo
Conector do Python Métodos de metadados cursor.catalogs(), cursor.schemas(), , cursor.tables()cursor.columns()
SQL Comandos de metadados SHOW TABLES, SHOW DATABASES, DESCRIBE TABLE, , USE CATALOGUSE SCHEMA
SQL information_schema SELECTconsultas em tabelas information_schema

Execute todas as operações de metadados fora das transações.

Aviso

A execução de transações em vários threads em um único objeto de conexão de driver resulta em comportamento indefinido. Execute apenas uma transação de cada vez em cada objeto de conexão.

Comportamento de isolamento

Alterações não confirmadas em uma transação interativa só são visíveis para sua sessão. Outras sessões veem o estado da tabela como era antes do início da transação.

Observação

Transações interativas usam detecção de conflitos mais conservadora do que transações não interativas e podem ter conflitos no nível da tabela (exceto para acréscimos incondicionais). Para detecção de conflitos em nível de linha, use transações não interativas (BEGIN ATOMIC ... END;).

  1. Para verificar o isolamento, crie a tabela de exemplo se ela não existir:
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. Nessa mesma sessão, inicie uma transação e faça uma alteração:

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. Em uma guia ou sessão de notebook separada do Editor de SQL (não uma nova célula no mesmo notebook), faça uma consulta à tabela:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Isso retorna 0 linhas porque a alteração não confirmada não está visível fora da primeira sessão.

  3. Retorne à sua primeira sessão e confirme:

    COMMIT;
    
  4. Consulta da segunda sessão novamente:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    A linha está visível porque a transação foi efetivada.

Esse isolamento impede que outros usuários leiam dados que podem ser revertidos.

Escolher um modo de transação

Scenario Modo recomendado
Trabalhos de ETL agendados Não interativo – confirmação ou reversão automática simplifica o tratamento de erros
Sequências de declarações fixas Não interativo – sintaxe mais simples, sem necessidade de confirmação manual
Validação de dados antes da confirmação Interativo – inspecionar os resultados e decidir se deseja confirmar
Aplicativos JDBC que precisam de controle manual Interativo – padrões de transação de banco de dados padrão

Próximas Etapas