Partilhar via


Modos de transação

Importante

As transações que escrevem em tabelas Delta administradas pelo Unity Catalog encontram-se em Prévia Pública.

As transações que escrevem em tabelas Iceberg geridas pelo Catálogo Unity encontram-se em Prévia Privada. Para aderir a esta pré-visualização, submeta o formulário de inscrição de pré-visualização gerida das tabelas Iceberg.

As transações suportam dois modos: não interativo e interativo. Esta página explica quando usar cada modo e inclui exemplos de implementação.

Para requisitos e uma visão geral das transações, consulte Transações. Para prática prática com ambos os modos, veja Tutorial: Coordenar transações entre tabelas.

Observação

Todas as tabelas escritas numa transação com múltiplas instruções e múltiplas tabelas devem:

Transações não interativas

Transações não interativas utilizam scripts SQL com a ATOMIC palavra-chave. O bloco de instruções compostas ATOMIC executa todas as instruções como uma única unidade atómica. Todos têm sucesso juntos ou todos falham juntos.

Computação suportada: Qualquer SQL warehouse, computação serverless ou cluster que execute Databricks Runtime 18.0 e superior.

Sintaxe suportada: Suporta SQL, blocos Scala spark.sql e blocos PySpark spark.sql .

Observação

Pode-se usar transações não interativas no forEachBatch Structured Streaming, chamando spark.sql("BEGIN ATOMIC ... END;"). No entanto, os checkpoints de Streaming Estruturado não avançam de forma transacional.

Sintaxe

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

O Azure Databricks confirma automaticamente todas as alterações se todas as instruções tiverem sucesso. Se alguma instrução falhar, o Azure Databricks reverte automaticamente todas as alterações.

Utilização no editor SQL

Execute transações não interativas diretamente no Editor SQL. Seleciona todo o bloco de instruções compostas ATOMIC e executa-o como uma única instrução:

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Utilização em cadernos

Execute transações não interativas em cadernos usando células SQL ou APIs programáticas.

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Utilização em tarefas agendadas

Transações não interativas funcionam bem em trabalhos agendados porque tratam automaticamente de commit e rollback:

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Se alguma instrução falhar, incluindo a asserção, toda a transação reverte automaticamente.

Uso com JDBC

Clientes externos podem executar transações não interativas.

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

Uso com API de Execução de Declarações

Execute transações não interativas usando a API de Execução de Instruções:

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

Padrões ETL

Os padrões seguintes demonstram fluxos de trabalho ETL comuns usando transações não interativas.

Padrão de staging e validação

Este padrão carrega os dados numa área de staging, valida a qualidade dos dados e funde os registos validados em tabelas de produção:

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

Padrão de tabela de dimensão e fatos

Este padrão atualiza as tabelas de dimensões antes de carregar tabelas de factos para manter a integridade referencial:

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

Tratamento de erros

Quando uma instrução falha dentro de um BEGIN ATOMIC ... END; bloco, o Azure Databricks reverte todas as alterações e devolve uma mensagem de erro.

Dicas para depuração:

  1. Revise a mensagem de erro para identificar qual a instrução que falhou.
  2. Teste instruções individualmente fora do bloco de transação.
  3. Adicionar verificações de validação usando SIGNAL para falhar com mensagens de erro personalizadas.
  4. Consulte o histórico de transações para obter contexto adicional.

Transações interativas

As transações interativas dão-lhe controlo explícito sobre os limites das transações. Começas manualmente uma transação, executas instruções e confirmas ou reverte explicitamente.

Computação suportada: apenas armazéns SQL .

Sintaxe suportada: apenas SQL.

Sintaxe

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

Valide antes de confirmar

Use transações interativas para validar os resultados antes de comprometer:

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

Reversão explícita

Reverter uma transação quando a validação falha ou a lógica de negócio exige descartar alterações:

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

Uso com JDBC

O driver JDBC suporta executar instruções DML usando executeUpdate() dentro de transações. Para uma lista de instruções DML suportadas, veja Operações suportadas.

Os clientes JDBC utilizam transações interativas desativando o modo de auto-commit:

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

Operações JDBC não suportadas

As seguintes operações JDBC não são suportadas em transações interativas:

Categoria Não suportado
Comutação de catálogo ou esquema Connection.setCatalog() e Connection.setSchema()
Alterações na configuração da sessão Connection.setClientInfo() para propriedades ao nível da sessão como TIMEZONE e ANSI_MODE
Todos os metadados do banco de dados (todos os protocolos) Todos os DatabaseMetaData.* métodos
Metadados de PreparedStatement PreparedStatement.getMetaData()
Procedimentos armazenados CALL procedure_name()

Utilização com ODBC

O driver ODBC suporta a execução de instruções DML usando SQLExecute() e SQLExecDirect() dentro de transações. Para uma lista de instruções DML suportadas, veja Operações suportadas.

Os clientes ODBC podem usar transações interativas com o driver Azure Databricks ODBC utilizando funções padrão de gestão de transações ODBC.

Operações ODBC não suportadas

As seguintes operações ODBC não são suportadas em transações interativas:

Categoria Não suportado
Todas as funções do catálogo SQLTables, SQLColumns, , SQLStatistics, SQLSpecialColumns, SQLPrimaryKeysSQLForeignKeys, SQLTablePrivileges, SQLColumnPrivilegesSQLProcedures,SQLProcedureColumns
Definição de atributos de ligação Comutação de catálogo, alterações de níveis de isolamento e alterações de modos de acesso usando SQLSetConnectAttr()
Tradução SQL SQLNativeSql

Usar com o Databricks SQL Connector para Python

O Databricks SQL Connector para Python suporta a execução de instruções DML usando cursor.execute() em transações. Para uma lista de instruções DML suportadas, veja Operações suportadas.

As aplicações Python podem usar transações interativas com o Databricks SQL Connector para Python , definindo autocommit=False:

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

Operações de conectores Python não suportadas

As seguintes operações de conector Python não são suportadas em transações interativas:

Categoria Não suportado
Todos os metadados cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns()

Limitações do driver para transações interativas

As seguintes limitações aplicam-se a todos os condutores ao utilizar transações interativas.

As operações de metadados não são suportadas dentro de transações interativas. As seguintes operações podem falhar numa transação, independentemente do driver ou protocolo:

Driver/Protocolo Tipo Methods
JDBC DatabaseMetaData getCatalogs(), getSchemas(), getTables(), getColumns(), getTypeInfo()
ODBC Funções do catálogo SQLTables, SQLColumns, SQLGetTypeInfo
Conector Python Métodos de metadados cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns()
SQL Comandos de metadados SHOW TABLES, SHOW DATABASES, DESCRIBE TABLE, USE CATALOG, USE SCHEMA
SQL information_schema SELECT Consultas em information_schema tabelas

Execute todas as operações de metadados fora das transações.

Advertência

Executar transações em múltiplas threads num único objeto de conexão do controlador conduz a comportamentos indefinidos. Execute apenas uma transação de cada vez em cada objeto de ligação.

Comportamento de isolamento

Alterações não comprometidas numa transação interativa só são visíveis para a sua sessão. Outras sessões mostram o estado da tabela como estava antes do início da transação.

Observação

As transações interativas utilizam uma deteção de conflito mais conservadora do que as não interativas e podem entrar em conflito ao nível da tabela (exceto para anexos incondicionais). Para deteção de conflitos ao nível da linha, use transações não interativas (BEGIN ATOMIC ... END;).

  1. Para verificar o isolamento, crie a tabela de exemplo caso esta não exista:
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. Nessa mesma sessão, inicia uma transação e faz uma alteração:

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. Numa aba separada do Editor SQL ou numa sessão de caderno (não numa célula nova no mesmo caderno), consulta a tabela:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Isto devolve 0 linhas, porque a alteração não confirmada não é visível fora da sua primeira sessão.

  3. Regresse à sua primeira sessão e comprometa-se:

    COMMIT;
    
  4. Consulta da segunda sessão novamente:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    A linha é visível devido à transação ter sido concluída.

Este isolamento impede que outros utilizadores leiam dados que possam ser revertidos.

Escolha um modo de transação

Scenario Modo recomendado
Tarefas de ETL agendadas Não interativo — o commit ou rollback automático simplifica o tratamento de erros
Sequências de instruções fixas Não interativo — sintaxe mais simples, sem necessidade de commit manual
Validação de dados antes do commit Interativo — inspecionar resultados e decidir se deve comprometer
Aplicações JDBC que necessitam de controlo manual Interativo — padrões padrão de transação de bases de dados

Passos seguintes