Aracılığıyla paylaş


İşlem modları

Önemli

Unity Kataloğu yönetilen Delta tablolarına yazan işlemler Genel Önizleme aşamasındadır.

Unity Kataloğu tarafından yönetilen Iceberg tablolarına yazılan işlemler Özel Önizleme aşamasındadır. Bu önizlemeye katılmak için yönetilen Iceberg tabloları önizleme kayıt formunu gönderin.

İşlemler iki modu destekler: etkileşimli olmayan ve etkileşimli olmayan. Bu sayfa, her modun ne zaman kullanılacağını kapsar ve uygulama örnekleri içerir.

Gereksinimler ve işlemlere genel bakış için bkz. İşlemler. Her iki modla uygulamalı alıştırma için bkz . Öğretici: Tablolar arasında işlemleri koordine edin.

Uyarı

Birden çok deyimli, çok tablolu bir işlemde yazılan tüm tablolar şu koşulları sağlamalıdır:

Etkileşimli olmayan işlemler

Etkileşimli olmayan işlemler, anahtar sözcüğüyle ATOMIC kullanır. ATOMIC bileşik deyimi bloğu, tüm deyimleri tek bir atomik birim olarak çalıştırır. Herkes ya birlikte başarılı olur ya da birlikte başarısız olur.

Desteklenen işlem: Databricks Runtime 18.0 ve üzerini çalıştıran herhangi bir SQL ambarı, sunucusuz işlem veya küme .

Desteklenen söz dizimi: SQL, Scala spark.sql blokları ve PySpark spark.sql bloklarını destekler.

Uyarı

Yapılandırılmış Akış’lar içindeki forEachBatch öğesini spark.sql("BEGIN ATOMIC ... END;") çağırarak etkileşimli olmayan işlemleri kullanabilirsiniz. Ancak, Yapılandırılmış Akış denetim noktaları işlemsel olarak ilerlemez.

Sözdizimi

BEGIN ATOMIC
  statement1;
  statement2;
  statement3;
END;

Tüm deyimler başarılı olursa Azure Databricks tüm değişiklikleri otomatik olarak kaydeder. Herhangi bir ifade başarısız olursa Azure Databricks tüm değişiklikleri otomatik olarak geri alma işlemi gerçekleştirir.

SQL düzenleyicisinde kullanma

Etkileşimli olmayan işlemleri doğrudan SQL Düzenleyicisi'nde çalıştırın. ATOMIC bileşik deyimi bloğunun tamamını seçin ve tek bir deyim olarak çalıştırın:

BEGIN ATOMIC
  DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;

  INSERT INTO staging_sales
  SELECT * FROM raw_sales WHERE load_date = current_date();

  MERGE INTO sales AS target
  USING staging_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Not defterlerinde kullanma

SQL hücrelerini veya programlı API'leri kullanarak not defterlerinde etkileşimli olmayan işlemler çalıştırın.

SQL

BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;

Python

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Scala

spark.sql("""
BEGIN ATOMIC
  UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
  UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
  INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
  VALUES (2001, 2002, 10, current_date());
END;
""")

Zamanlanmış işlerde kullanma

Etkileşimli olmayan işlemler, işleme ve geri alma işlemlerini otomatik olarak işlediği için zamanlanmış işlerde düzgün çalışır:

BEGIN ATOMIC
  -- Clear previous staging data
  DELETE FROM staging_daily_sales WHERE load_date = current_date();

  -- Load new data
  INSERT INTO staging_daily_sales
  SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
  FROM raw_sales
  WHERE sale_date = current_date() - INTERVAL 1 DAY;

  -- Validate row count (fails transaction if no data)
  IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
  END IF;

  -- Merge into production
  MERGE INTO daily_sales AS target
  USING staging_daily_sales AS source
  ON target.sale_id = source.sale_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;
END;

Doğrulama dahil olmak üzere herhangi bir ifade başarısız olursa, tüm işlem otomatik olarak geri alınır.

JDBC ile kullanma

Dış istemciler etkileşimli olmayan işlemler çalıştırabilir.

JDBC

String sql = """
    BEGIN ATOMIC
      INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
      UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
    END;
    """;

Statement stmt = conn.createStatement();
stmt.execute(sql);

Deyim Yürütme API'siyle kullanma

Deyim Yürütme API'sini kullanarak etkileşimli olmayan işlemler çalıştırın:

import requests

sql = """
BEGIN ATOMIC
  INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
  UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""

response = requests.post(
    f"{workspace_url}/api/2.0/sql/statements",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "warehouse_id": warehouse_id,
        "statement": sql,
        "wait_timeout": "30s"
    }
)

ETL desenleri

Aşağıdaki desenler etkileşimli olmayan işlemler kullanan yaygın ETL iş akışlarını gösterir.

Aşama ve doğrulama şablonu

Bu düzen verileri hazırlama alanına yükler, veri kalitesini doğrular ve doğrulanmış kayıtları üretim tablolarıyla birleştirir:

BEGIN ATOMIC
  -- Load into staging
  INSERT INTO staging_customers
  SELECT * FROM external_source
  WHERE ingest_date = current_date();

  -- Validate data quality
  IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
  END IF;

  -- Merge validated data
  MERGE INTO customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Update metadata
  UPDATE etl_metadata
  SET last_load_date = current_date(),
      rows_processed = (SELECT COUNT(*) FROM staging_customers)
  WHERE table_name = 'customers';
END;

Boyut ve bilgi tablosu deseni

Bu düzen bilgi tutarlılığını korumak için olgu tablolarını yüklemeden önce boyut tablolarını güncelleştirir:

BEGIN ATOMIC
  -- Update dimension tables first
  MERGE INTO dim_products AS target
  USING staging_products AS source
  ON target.product_id = source.product_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  MERGE INTO dim_customers AS target
  USING staging_customers AS source
  ON target.customer_id = source.customer_id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *;

  -- Then load fact table with foreign key references
  INSERT INTO fact_sales
  SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
  FROM staging_sales s
  JOIN dim_products p ON s.product_id = p.product_id
  JOIN dim_customers c ON s.customer_id = c.customer_id;
END;

Hata işleme

Bir BEGIN ATOMIC ... END; bloğu içinde bir deyim başarısız olduğunda, Azure Databricks yapılan tüm değişiklikleri geri alır ve bir hata mesajı verir.

Hata ayıklama ipuçları:

  1. Hangi deyimin başarısız olduğunu belirlemek için hata iletisini gözden geçirin.
  2. İfadeleri transaction blokunun dışında ayrı ayrı test edin.
  3. Özel hata iletileriyle başarısız olmak için kullanarak SIGNAL doğrulama denetimleri ekleyin.
  4. Ek bağlam için işlem geçmişini sorgulayın.

Etkileşimli işlemler

Etkileşimli işlemler size işlem sınırları üzerinde açık denetim sağlar. Bir işlemi el ile başlatır, deyimleri çalıştırır ve açıkça işler veya geri alırsınız.

Desteklenen hesaplama: Yalnızca SQL veri ambarları.

Desteklenen söz dizimi: Yalnızca SQL.

Sözdizimi

BEGIN TRANSACTION;

statement1;
statement2;

COMMIT;
-- or: ROLLBACK;

İşlemeden önce doğrulama

İşlemeden önce sonuçları doğrulamak için etkileşimli işlemleri kullanın:

BEGIN TRANSACTION;

-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();

-- Validate and commit or rollback
BEGIN
  DECLARE duplicate_count INT;
  SET duplicate_count = (
    SELECT COUNT(*) FROM (
      SELECT customer_id, COUNT(*) as cnt
      FROM staging_customers
      WHERE load_date = current_date()
      GROUP BY customer_id
      HAVING COUNT(*) > 1
    )
  );

  IF duplicate_count > 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
  ELSE
    MERGE INTO customers AS target
    USING staging_customers AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    COMMIT;
  END IF;
END;

Belirgin geri alma

Doğrulama başarısız olduğunda veya iş mantığı değişiklikleri atılması gerektiğinde işlemi geri alın:

BEGIN TRANSACTION;

UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;

-- Check if quantity would go negative
BEGIN
  DECLARE new_quantity INT;
  SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);

  IF new_quantity < 0 THEN
    ROLLBACK;
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
  ELSE
    COMMIT;
  END IF;
END;

JDBC ile kullanma

JDBC sürücüsü, executeUpdate() kullanarak işlemler içinde DML deyimlerini çalıştırmayı destekler. Desteklenen DML deyimlerinin listesi için bkz . Desteklenen işlemler.

JDBC istemcileri, otomatik işleme modunu devre dışı bırakarak etkileşimli işlemler kullanır:

Connection conn = DriverManager.getConnection(jdbcUrl, properties);

try {
    conn.setAutoCommit(false);  // Start transaction mode
    Statement stmt = conn.createStatement();

    stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
    stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");

    conn.commit();  // Commit the transaction

} catch (SQLException e) {
    conn.rollback();  // Roll back on error
    throw e;
} finally {
    conn.close();
}

Desteklenmeyen JDBC işlemleri

Aşağıdaki JDBC işlemleri etkileşimli işlemlerde desteklenmez:

Kategori Desteklenmiyor
Katalog veya şema değiştirme Connection.setCatalog() ve Connection.setSchema()
Oturum yapılandırma değişiklikleri Connection.setClientInfo() gibi oturum düzeyi özellikleri için TIMEZONE ve ANSI_MODE
Tüm DatabaseMetaData (tüm protokoller) Tüm DatabaseMetaData.* yöntemler
PreparedStatement meta verileri PreparedStatement.getMetaData()
Saklanan prosedürler CALL procedure_name()

ODBC ile kullanma

ODBC sürücüsü, işlemler içinde SQLExecute() ve SQLExecDirect() kullanarak DML deyimlerinin çalıştırılmasını destekler. Desteklenen DML deyimlerinin listesi için bkz . Desteklenen işlemler.

ODBC istemcileri, standart ODBC işlem yönetimi işlevlerini kullanarak Azure Databricks ODBC sürücüsüyle etkileşimli işlemler kullanabilir.

Desteklenmeyen ODBC işlemleri

Aşağıdaki ODBC işlemleri etkileşimli işlemlerde desteklenmez:

Kategori Desteklenmiyor
Tüm katalog işlevleri SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns
Bağlantı özniteliklerini ayarlama Katalog değiştirmeleri, yalıtım düzeyi değişiklikleri ve erişim modu değişiklikleri SQLSetConnectAttr() kullanılarak yapılır.
SQL çevirisi SQLNativeSql

Python için Databricks SQL Bağlayıcısı ile kullanma

Python için Databricks SQL Bağlayıcısı, işlemlerin içinde kullanarak cursor.execute() DML deyimlerinin çalıştırılmasını destekler. Desteklenen DML deyimlerinin listesi için bkz . Desteklenen işlemler.

Python uygulamaları, autocommit=False ayarlarını yapılandırarak etkileşimli işlemler kullanabilir:

from databricks import sql

with sql.connect(
    server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
    http_path="sql/1.0/warehouses/abc123def456",
    access_token="your-access-token",
    autocommit=False
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
        connection.commit()

Desteklenmeyen Python bağlayıcısı işlemleri

Aşağıdaki Python bağlayıcısı işlemleri etkileşimli işlemlerde desteklenmez:

Kategori Desteklenmiyor
Tüm meta veriler cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns()

Etkileşimli işlemler için sürücü sınırlamaları

Etkileşimli işlemler kullanılırken tüm sürücüler için aşağıdaki sınırlamalar geçerlidir.

Meta veri işlemleri etkileşimli işlemlerde desteklenmez. Aşağıdaki işlemler, sürücü veya protokolden bağımsız olarak bir işlem içinde başarısız olabilir:

Sürücü/Protokol Türü Methods
JDBC DatabaseMetaData getCatalogs(), getSchemas(), getTables(), getColumns(), , getTypeInfo()
ODBC Katalog işlevleri SQLTables, SQLColumns, SQLGetTypeInfo
Python bağlayıcısı Meta veri yöntemleri cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns()
SQL Meta veri komutları SHOW TABLES, SHOW DATABASES, DESCRIBE TABLE, USE CATALOG, , USE SCHEMA
SQL information_schema SELECT tablolar üzerinde information_schema sorgular

tüm meta veri işlemlerini işlemlerin dışında çalıştırın.

Uyarı

Tek bir sürücü bağlantı nesnesinde birden çok iş parçacığı üzerinde işlem çalıştırmak tanımsız davranışa neden olur. Her bağlantı nesnesinde aynı anda yalnızca bir işlem çalıştırın.

Yalıtım davranışı

Etkileşimli bir işlemdeki kaydedilmemiş değişiklikler yalnızca oturumunuz tarafından görülebilir. Diğer oturumlar, tablo durumunu işleminiz başlamadan önceki haliyle görür.

Uyarı

Etkileşimli işlemler, etkileşimli olmayan işlemlerden daha muhafazakar çakışma algılama kullanır ve tablo düzeyinde (koşulsuz eklemeler dışında) çakışabilir. Satır düzeyi çakışma algılaması için etkileşimsiz işlemleri kullanın (BEGIN ATOMIC ... END;).

  1. Yalıtımını doğrulamak için, eğer yoksa örnek tabloyu oluşturun.
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  1. Aynı oturumda bir işlem başlatın ve bir değişiklik yapın:

    BEGIN TRANSACTION;
    INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);
    
  2. Ayrı bir SQL Düzenleyicisi sekmesinde veya not defteri oturumunda (aynı not defterindeki yeni bir hücrede değil), tabloyu sorgulayın:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    Kaydedilmemiş değişiklik ilk oturumunuzun dışında görünmediğinden bu 0 satır döndürür.

  3. İlk oturumunuza dönün ve değişiklikleri kaydedin.

    COMMIT;
    
  4. İkinci oturumdan yeniden sorgula:

    -- Run this in the SECOND session
    SELECT * FROM sample_accounts WHERE id = 10;
    

    İşlem tamamlandığından satır görünür.

Bu yalıtım, diğer kullanıcıların geri alınabilecek verileri okumasını engeller.

İşlem modu seçme

Scenario Önerilen mod
Zamanlanmış ETL görevleri Etkileşimli olmayan— otomatik işleme veya geri alma, hata işlemeyi basitleştirir
Sabit deyim dizileri Etkileşimli olmayan—daha basit söz dizimi, el ile işleme gerekmez
İşlemeden önce veri doğrulama Etkileşimli— sonuçları inceleyin ve işlemeye karar verin
El ile denetime ihtiyaç duyan JDBC uygulamaları Etkileşimli— standart veritabanı işlem desenleri

Sonraki Adımlar