Partage via


Modes de transaction

Important

Les transactions qui écrivent dans les tables Delta gérées par Unity Catalog sont en Aperçu public.

Les transactions qui écrivent dans les tables Iceberg gérées par le catalogue Unity sont en préversion privée. Pour rejoindre cette préversion, envoyez le formulaire d’inscription en préversion des tables Iceberg managées.

Les transactions prennent en charge deux modes : non interactif et interactif. Cette page décrit quand utiliser chaque mode et inclut des exemples d’implémentation.

Pour connaître les exigences et une vue d’ensemble des transactions, consultez Transactions. Pour une pratique pratique avec les deux modes, consultez Tutoriel : Coordonner les transactions à travers les tables.

Note

Toutes les tables écrites dans une transaction multi-instruction, multi-table doivent :

Transactions non interactives

Les transactions non interactives utilisent des scripts SQL avec le ATOMIC mot clé. Le bloc d’instructions atomiques exécute toutes les instructions sous la forme d’une unité atomique unique. Tous réussissent ensemble ou échouent ensemble.

Calcul pris en charge : n’importe quel entrepôt SQL, calcul serverless ou cluster exécutant Databricks Runtime 18.0 et versions ultérieures.

Syntaxe prise en charge : prend en charge les blocs SQL, Scala spark.sql et PySpark spark.sql .

Note

Vous pouvez utiliser des transactions non interactives dans Structured Streaming en appelant la fonction forEachBatch via spark.sql("BEGIN ATOMIC ... END;"). Toutefois, les points de contrôle de "Structured Streaming" n'avancent pas transactionnellement.

Syntaxe

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

Azure Databricks valide automatiquement toutes les modifications si toutes les instructions réussissent. Si une instruction échoue, Azure Databricks annule automatiquement toutes les modifications.

Utiliser dans l’éditeur SQL

Exécutez des transactions non interactives directement dans l’Éditeur SQL. Sélectionnez l’intégralité du bloc d’instructions composé ATOMIC et exécutez-le en tant qu’instruction unique :

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Utiliser dans les notebooks

Exécutez des transactions non interactives dans des notebooks à l’aide de cellules SQL ou d’API programmatiques.

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Utiliser dans les travaux planifiés

Les transactions non interactives fonctionnent bien dans les travaux planifiés, car elles gèrent automatiquement la validation et la restauration :

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Si une instruction échoue, y compris lorsqu’il s’agit de l’assertion, la transaction entière est annulée automatiquement.

Utiliser avec JDBC

Les clients externes peuvent exécuter des transactions non interactives.

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

Utiliser avec l'API d'exécution des requêtes

Exécutez des transactions non interactives à l’aide de l’API d’exécution d’instruction :

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

Modèles ETL

Les modèles suivants illustrent des flux de travail ETL courants utilisant des transactions non interactives.

Modèle de mise en scène et de validation

Ce modèle charge les données dans une zone intermédiaire, valide la qualité des données et fusionne les enregistrements validés dans des tables de production :

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

Modèle de table de faits et de dimension

Ce modèle met à jour les tables de dimension avant de charger des tables de faits pour maintenir l’intégrité référentielle :

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

Gestion des erreurs

Lorsqu’une instruction échoue dans un BEGIN ATOMIC ... END; bloc, Azure Databricks restaure toutes les modifications et retourne un message d’erreur.

Conseils de débogage :

  1. Passez en revue le message d’erreur pour identifier l’instruction qui a échoué.
  2. Testez les déclarations individuellement en dehors du bloc de transaction.
  3. Ajoutez des vérifications de validation avec SIGNAL pour échouer avec des messages d’erreur personnalisés.
  4. Interroger l’historique des transactions pour un contexte supplémentaire.

Transactions interactives

Les transactions interactives vous permettent de contrôler explicitement les limites des transactions. Vous commencez manuellement une transaction, exécutez des instructions et validez ou annulez explicitement.

Calcul pris en charge : entrepôts SQL uniquement.

Syntaxe prise en charge : SQL uniquement.

Syntaxe

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

Valider avant l'engagement

Utilisez des transactions interactives pour valider les résultats avant de les effectuer.

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

Retour en arrière explicite

Restaurer une transaction lorsque la validation échoue ou si la logique métier nécessite de supprimer les modifications :

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

Utiliser avec JDBC

Le pilote JDBC prend en charge l'exécution d'instructions DML à l'aide de executeUpdate() au sein des transactions. Pour obtenir la liste des instructions DML prises en charge, consultez opérations prises en charge.

Les clients JDBC utilisent des transactions interactives en désactivant le mode de validation automatique :

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

Opérations JDBC non prises en charge

Les opérations JDBC suivantes ne sont pas prises en charge dans les transactions interactives :

Catégorie Non pris en charge
Changement de catalogue ou de schéma Connection.setCatalog() et Connection.setSchema()
Modifications de configuration de session Connection.setClientInfo() pour les propriétés au niveau de la session, telles que TIMEZONE et ANSI_MODE
All DatabaseMetaData (tous les protocoles) Toutes les DatabaseMetaData.* méthodes
Métadonnées PreparedStatement PreparedStatement.getMetaData()
Procédures stockées CALL procedure_name()

Utiliser avec ODBC

Le pilote ODBC prend en charge l’exécution d’instructions DML à l’aide SQLExecute() et SQLExecDirect() dans les transactions. Pour obtenir la liste des instructions DML prises en charge, consultez opérations prises en charge.

Les clients ODBC peuvent utiliser des transactions interactives avec le pilote ODBC Azure Databricks à l’aide de fonctions de gestion des transactions ODBC standard.

Note

AutoCommit doit être désactivé pour utiliser des transactions. UseNativeQuery doit être défini à 1 pour désactiver AutoCommit au moment de l’exécution.

Opérations ODBC non prises en charge

Les opérations ODBC suivantes ne sont pas prises en charge dans les transactions interactives :

Catégorie Non pris en charge
Toutes les fonctions de catalogue SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns
Définition des attributs de connexion Changement de catalogue, modifications de niveau d’isolation et modifications du mode d’accès à l’aide de SQLSetConnectAttr()
Traduction SQL SQLNativeSql

Utiliser avec databricks SQL Connector pour Python

Databricks SQL Connector pour Python prend en charge l’exécution d’instructions DML à l'aide de cursor.execute() dans le cadre de transactions. Pour obtenir la liste des instructions DML prises en charge, consultez opérations prises en charge.

Les applications Python peuvent utiliser des transactions interactives avec le connecteur Databricks SQL pour Python en définissant autocommit=False:

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

Opérations de connecteur Python non prises en charge

Les opérations de connecteur Python suivantes ne sont pas prises en charge dans les transactions interactives :

Catégorie Non pris en charge
Toutes les métadonnées cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns()

Limitations du pilote pour les transactions interactives

Les limitations suivantes s’appliquent à tous les pilotes lors de l’utilisation de transactions interactives.

Les opérations de métadonnées ne sont pas prises en charge dans les transactions interactives. Les opérations suivantes peuvent échouer dans une transaction, quel que soit le pilote ou le protocole :

Pilote/protocole Type Méthodes
JDBC DatabaseMetaData getCatalogs(), getSchemas(), , getTables(), getColumns(), getTypeInfo()
ODBC Fonctions catalogue SQLTables, SQLColumns, SQLGetTypeInfo
Connecteur Python Méthodes de métadonnées cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns()
SQL Commandes de métadonnées SHOW TABLES, SHOW DATABASES, , DESCRIBE TABLE, USE CATALOG, USE SCHEMA
SQL information_schema SELECT requêtes sur information_schema les tables

Exécutez toutes les opérations de métadonnées en dehors des transactions.

Avertissement

L’exécution de transactions sur plusieurs threads sur un seul objet de connexion de pilote entraîne un comportement non défini. Exécutez une seule transaction à la fois sur chaque objet de connexion.

Comportement d’isolation

Les modifications non validées dans une transaction interactive ne sont visibles que pour votre session. D’autres sessions voient l’état de la table tel qu’il était avant le début de votre transaction.

Note

Les transactions interactives utilisent une détection de conflit plus conservatrice que les transactions non interactives et peuvent entrer en conflit au niveau de la table (à l’exception des ajouts conditionnels). Pour la détection des conflits au niveau des lignes, utilisez des transactions non interactives (BEGIN ATOMIC ... END;).

  1. Pour vérifier l’isolation, créez la table d'exemple si elle n’existe pas :
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. Dans cette même session, démarrez une transaction et apportez une modification :

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. Dans un onglet éditeur SQL distinct ou une session de notebook (pas une nouvelle cellule dans le même bloc-notes), interrogez le tableau :

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Cela retourne 0 lignes, car la modification non validée n’est pas visible en dehors de votre première session.

  3. Revenez à votre première session et validez :

    COMMIT;
    
  4. Interrogez à nouveau à partir de la deuxième session :

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    La ligne est visible parce que la transaction a été validée.

Cette isolation empêche les autres utilisateurs de lire des données qui peuvent être restaurées.

Choisir un mode de transaction

Scénario Mode recommandé
Travaux ETL planifiés Non interactif : la validation automatique ou la restauration simplifie la gestion des erreurs
Séquences d’instructions fixes Non interactive : syntaxe plus simple, aucune validation manuelle n’est nécessaire
Validation des données avant validation Interactive : inspecter les résultats et décider s’il faut valider
Applications JDBC nécessitant un contrôle manuel Interactive : modèles de transaction de base de données standard

Étapes suivantes