Režimy transakcí

Důležité

Transakce, které se zapisují do spravovaných tabulek Delta katalogu Unity, jsou ve verzi Public Preview.

Transakce, které zapisují do katalogu Unity spravované tabulky Iceberg, jsou ve verzi Private Preview. Pokud se chcete připojit k této verzi Preview, odešlete formulář pro registraci spravovaných tabulek Iceberg ve verzi Preview.

Transakce podporují dva režimy: neinteraktivní a interaktivní. Tato stránka popisuje, kdy použít každý režim a obsahuje příklady implementace.

Požadavky a přehled transakcí naleznete v tématu Transakce. Praktické cvičení s oběma režimy najdete v kurzu : Koordinace transakcí napříč tabulkami.

Poznámka:

Všechny tabulky, které jsou zapsány ve více příkazech v rámci transakce s více tabulkami, musí:

Neinteraktivní transakce

Neinteraktivní transakce používají skriptování SQL s klíčovým slovem ATOMIC . Blok složeného příkazu ATOMIC spouští všechny příkazy jako jednu atomickou jednotku. Všichni uspějí společně nebo všichni selžou společně.

Podporované výpočetní prostředky: Libovolný SQL Warehouse, bezserverový výpočetní výkon nebo cluster s Modulem Databricks Runtime 18.0 a novějším.

Podporovaná syntaxe: Podporuje bloky SQL, Scala spark.sql a PySpark spark.sql .

Poznámka:

Neinteraktivní transakce v rámci strukturovaného streamování forEachBatch můžete použít voláním spark.sql("BEGIN ATOMIC ... END;"). Kontrolní body strukturovaného streamování se však neposouvají transakčně.

Syntaxe

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

Azure Databricks automaticky potvrdí všechny změny, pokud budou všechny příkazy úspěšné. Pokud některý příkaz selže, Azure Databricks automaticky vrátí všechny změny zpět.

Použití v editoru SQL

Spusťte neinteraktivní transakce přímo v editoru SQL. Vyberte celý blok složeného příkazu ATOMIC a spusťte ho jako jeden příkaz:

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Použití v noteboocích

Spouštění neinteraktivních transakcí v poznámkových blocích pomocí buněk SQL nebo programových rozhraní API

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Použití v naplánovaných úlohách

Neinteraktivní transakce fungují dobře v naplánovaných úlohách, protože automaticky zpracovávají potvrzení (commit) a návrat (rollback).

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Pokud jakýkoli příkaz selže, včetně asercí, celá transakce se zruší automaticky.

Použití s JDBC

Externí klienti můžou spouštět neinteraktivní transakce.

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

Použití s rozhraním API pro provádění příkazů

Spusťte neinteraktivní transakce pomocí rozhraní API pro spouštění příkazů:

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

Vzory ETL

Následující vzory ukazují běžné pracovní postupy ETL využívající neinteraktivní transakce.

Vzor přípravy a ověřování

Tento model načte data do meziskladu, ověří kvalitu dat a sloučí ověřené záznamy do produkčních tabulek.

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

Model tabulky dimenzí a faktů

Tento model aktualizuje tabulky dimenzí před načtením tabulek faktů, aby se zachovala referenční integrita:

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

Zpracování chyb

Pokud příkaz selže v rámci bloku BEGIN ATOMIC ... END;, Azure Databricks vrátí zpět všechny změny a vrátí chybovou zprávu.

Tipy pro odstraňování chyb:

  1. Zkontrolujte chybovou zprávu a zjistěte, který příkaz selhal.
  2. Testujte příkazy jednotlivě mimo transakční blok.
  3. Přidejte kontroly ověření pomocí SIGNAL, které při selhání zobrazí vlastní chybové zprávy.
  4. Dotaz na historii transakcí pro doplňující kontext

Interaktivní transakce

Interaktivní transakce poskytují explicitní kontrolu nad hranicemi transakcí. Transakci zahájíte ručně, spustíte příkazy a explicitně potvrdíte nebo vrátíte zpět.

Podporované výpočetní prostředky: Pouze služby SQL Warehouse .

Podporovaná syntaxe: Pouze SQL

Syntaxe

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

Ověřte před potvrzením

Před potvrzením ověřte výsledky pomocí interaktivních transakcí:

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

Explicitní vrácení zpět

Vrácení transakce zpět při selhání ověření nebo obchodní logika vyžaduje zahození změn:

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

Použití s JDBC

Ovladač JDBC podporuje provádění příkazů DML pomocí executeUpdate() v rámci transakcí. Seznam podporovaných příkazů DML najdete v tématu Podporované operace.

Klienti JDBC používají interaktivní transakce zakázáním režimu automatického potvrzení:

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

Nepodporované operace JDBC

V rámci interaktivních transakcí se nepodporují následující operace JDBC:

Kategorie Nepodporováno
Přepínání katalogu nebo schématu Connection.setCatalog() a Connection.setSchema()
Změny konfigurace relace Connection.setClientInfo() pro vlastnosti na úrovni relace, jako TIMEZONE a ANSI_MODE
Všechny DatabaseMetaData (všechny protokoly) Všechny DatabaseMetaData.* metody
Metadata pro PreparedStatement PreparedStatement.getMetaData()
Uložené procedury CALL procedure_name()

Použití s rozhraním ODBC

Ovladač ODBC podporuje spouštění příkazů DML pomocí SQLExecute() a SQLExecDirect() v rámci transakcí. Seznam podporovaných příkazů DML najdete v tématu Podporované operace.

Klienti ODBC mohou používat interaktivní transakce s ovladačem Azure Databricks ODBC pomocí standardních funkcí správy transakcí ODBC.

Poznámka:

AutoCommit musí být zakázán, aby bylo možné používat transakce. UseNativeQuery musí být nastaveno na 1, aby se za běhu zakázalo AutoCommit.

Nepodporované operace ODBC

Následující operace ODBC nejsou podporovány v rámci interaktivních transakcí:

Kategorie Nepodporováno
Všechny funkce katalogu SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns
Nastavení atributů připojení Přepínání katalogu, změny na úrovni izolace a změny režimu přístupu pomocí SQLSetConnectAttr()
Překlad SQL SQLNativeSql

Použití s Databricks SQL konektorem pro Python

Konektor SQL Databricks pro Python podporuje spouštění příkazů DML pomocí příkazu cursor.execute() v rámci transakcí. Seznam podporovaných příkazů DML najdete v tématu Podporované operace.

Python aplikace můžou používat interaktivní transakce s konektorem SQL Databricks pro Python nastavením autocommit=False:

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

Nepodporované operace konektoru Python

Interaktivní transakce nepodporují následující operace konektoru Python:

Kategorie Nepodporováno
Všechna metadata cursor.catalogs(), cursor.schemas(), , cursor.tables()cursor.columns()

Omezení ovladačů pro interaktivní transakce

Následující omezení platí pro všechny ovladače při použití interaktivních transakcí.

Operace metadat nejsou podporovány uvnitř interaktivních transakcí. Následující operace můžou selhat v rámci transakce bez ohledu na ovladač nebo protokol:

Ovladač/protokol Typ Methods
JDBC DatabaseMetaData getCatalogs(), getSchemas(), getTables(), , getColumns()getTypeInfo()
rozhraní ODBC Funkce katalogu SQLTables SQLColumns, SQLGetTypeInfo
konektor Python Metody metadat cursor.catalogs(), cursor.schemas(), , cursor.tables()cursor.columns()
SQL Příkazy metadat SHOW TABLES, SHOW DATABASES, DESCRIBE TABLE, , USE CATALOGUSE SCHEMA
SQL information_schema SELECT dotazy na information_schema tabulky

Spusťte všechny operace metadat mimo transakce.

Výstraha

Provádění transakcí na více vláknech na jednom objektu připojení ovladače vede k nedefinovanému chování. Na každém objektu připojení spusťte vždy pouze jednu transakci.

Chování izolace

Nepotvrzené změny v interaktivní transakci jsou viditelné pouze pro vaši relaci. Ostatní relace vidí stav tabulky tak, jak byl před zahájením vaší transakce.

Poznámka:

Interaktivní transakce používají konzervativnější konfliktovou detekci než neinteraktivní transakce a mohou kolidovat na úrovni tabulky (s výjimkou nepodmíněného appendování). Pro detekci konfliktů na úrovni řádků použijte neinteraktivní transakce (BEGIN ATOMIC ... END;).

  1. Pokud chcete ověřit izolaci, vytvořte ukázkovou tabulku, pokud neexistuje:
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. Ve stejné relaci spusťte transakci a proveďte změnu:

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. Na samostatné kartě Editoru SQL nebo v relaci poznámkového bloku (ne v nové buňce ve stejném poznámkovém bloku) zadejte dotaz na tabulku:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Vrátí 0 řádků, protože nepotvrzená změna není viditelná mimo vaše první sezení.

  3. Vraťte se do prvního sezení a proveďte změny:

    COMMIT;
    
  4. Znovu zadejte dotaz z druhé relace:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Řádek je viditelný, protože transakce byla potvrzená.

Tato izolace brání ostatním uživatelům ve čtení dat, která mohou být zrušena.

Volba režimu transakce

Scénář Doporučený režim
Naplánované úlohy ETL Neinteraktivní – automatické potvrzení nebo vrácení zpět zjednodušuje zpracování chyb.
Pevné sekvence příkazů Neinteraktivní – jednodušší syntaxe, nevyžaduje se ruční potvrzení
Ověření dat před potvrzením Interaktivní – zkontrolujte výsledky a rozhodněte se, zda je potvrdit
Aplikace JDBC, které potřebují ruční řízení Interaktivní – vzory standardních databázových transakcí

Další kroky