Aracılığıyla paylaş


Öğretici: Tablolar arasında işlemleri koordine edin

Önemli

Unity Kataloğu yönetilen Delta tablolarına yazan işlemler Genel Önizleme aşamasındadır.

Unity Kataloğu tarafından yönetilen Iceberg tablolarına yazılan işlemler Özel Önizleme aşamasındadır. Bu önizlemeye katılmak için yönetilen Iceberg tabloları önizleme kayıt formunu gönderin.

Bu öğreticide, birden çok deyim ve tablo arasında güncelleştirmeleri koordine etmek için işlemlerin nasıl kullanılacağı gösterilmektedir. Her iki işlem modunu da öğrenirsiniz: otomatik olarak işleyen etkileşimli olmayan işlemler ve size açık denetim sağlayan etkileşimli işlemler. Öğreticide ayrıca Azure Databricks üzerinde görev açısından kritik depolama iş yükleri oluşturmak için saklı yordamlar ve SQL Betiği ile işlemlerin kullanılması gösterilmektedir.

Gereksinimler

  • Ortam: Azure Databricks çalışma alanına erişim.
  • İşlem: Desteklenen işlem türleri işlem moduna göre değişir:
    • Klasik veya sunucusuz SQL ambarı her iki işlem modunu da destekler.
    • Sunucusuz işlem yalnızca etkileşimli olmayan işlemleri destekler.
    • Databricks Runtime 18.0 veya üzerini çalıştıran klasik kümeler yalnızca etkileşimli olmayan işlemleri destekler.
  • Ayrıcalıklar: CREATE TABLEUnity Kataloğu şemasında.

Örnek tabloları ayarlama

Birden çok deyimli, çok tablolu bir işlemde yazılan tüm tablolar şu koşulları sağlamalıdır:

SQL Düzenleyicisi'nde veya not defterinde iki örnek tablo oluşturun:

-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
  id INT,
  account_name STRING,
  balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
  id INT,
  account_id INT,
  transaction_type STRING,
  amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
  'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
  (1, 'Alice', 1000.00),
  (2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
  (1, 1, 'deposit', 100.00);

Kurulumu doğrulayın:

SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

Çıktı:

sample_accounts:
id	account_name	balance
1	  Alice	        1000.00
2	  Bob	          500.00

sample_transactions:
id	account_id	transaction_type	amount
1	           1	         deposit	100.00

Etkileşimli olmayan işlemler

Etkileşimli olmayan işlemler söz dizimi kullanır BEGIN ATOMIC ... END; . Tüm ifadeler tek atomik birim olarak çalışır. Her komut başarılı olursa Azure Databricks otomatik olarak taahhüt eder. Herhangi bir ifade başarısız olursa Azure Databricks tüm değişiklikleri otomatik olarak geri alır. Ayrıntılı söz dizimi ve kullanım desenleri için bkz. etkileşimli olmayan işlemler.

Başarılı bir işlem çalıştırma

İki tabloyu da atomik olarak güncelleştirin:

BEGIN ATOMIC
  -- Update Alice's account balance
  UPDATE sample_accounts
  SET balance = balance + 100.00
  WHERE id = 1;

  -- Record the deposit transaction
  INSERT INTO sample_transactions
  VALUES (2, 1, 'deposit', 100.00);
END;

her iki işlemin de başarılı olduğunu doğrulayın:

-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

Hem bakiye güncelleştirmesi hem de işlem kaydı birlikte oluşturuldu. Eğer iki ifadeden biri başarısız olsaydı, hiçbir değişiklik işlenmeyecekti ve yan etkiler olmadan Databricks işlemi sonlandıracaktı.

Bir koşuldaki bir işlemi başarısız yapmak için SIGNAL kullanma

BEGIN ATOMIC ... END; bloğu içinde SIGNAL kullanarak, kullanıcı tanımlı bir koşul karşılanmadığında işlemin başarısız olmasını sağlayabilirsiniz. Bu, işlemeden önce veri doğrulaması için kullanışlıdır:

BEGIN ATOMIC
  -- Insert new account
  INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

  -- Fail the transaction if balance is negative
  IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
    SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
  END IF;
END;

bir SIGNAL hata oluşturur ve bu da işlemin tamamının otomatik olarak geri alınmasına neden olur. Eklemenin geri alındığı doğrulayın:

-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

Bkz. hatada otomatik geri alma

Geçersiz bir ifadeyle bir işlem yürütün:

BEGIN ATOMIC
  -- This statement is valid
  INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

  -- This statement will fail (table does not exist)
  INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

İşlem bir hatayla başarısız oluyor. İlk deyimin geri alındığı doğrulayın:

-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

İlk INSERT deyim geçerli olsa da, ikinci deyim başarısız olduğundan geri alındı. Bu, işlemlerin tümünün veya hiçbir şeyin garantisini gösterir.

Etkileşimli işlemler

Etkileşimli işlemler, işleme veya geri alma zamanları üzerinde size açık denetim sağlar. Begin TRANSACTION komutunu kullanarak başlatın, sonra değişiklikleri kaydetmek için COMMIT'i veya bunları atmak için ROLLBACK'i kullanın.

Değişiklikleri işleme

İşlem başlatma:

BEGIN TRANSACTION;

Değişiklikleri yapın (henüz uygulanmadı)

INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

Değişiklikleri kalıcı hale getirmek için uygulayın.

COMMIT;

Değişiklikleri doğrulayın:

-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

Değişiklikleri geri alma

Yeni bir işlem başlatın:

BEGIN TRANSACTION;

Değişiklik yapın:

INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

Değişikliğin oturumunuzda görünür olduğunu doğrulayın:

-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

Değişikliği iptal etmek için geri alın.

ROLLBACK;

Değişikliğin atıldığını doğrulayın:

-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

Saklı yordamlar ve SQL Betiği ile kullanma

Yeniden kullanılabilir işlem mantığı oluşturmak için işlemleri saklı yordamlarla birleştirebilirsiniz. Bu düzen, sık çalıştırdığınız karmaşık işlemler için kullanışlıdır.

  1. Katalog tarafından yönetilen işlemelerin etkinleştirildiği iki tablo oluşturma

    
    CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
     TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
    
    
  2. Saklı yordamı tanımlayın

    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
        IN  p_order_id      STRING,
        IN  p_customer_id   STRING,
        IN  p_order_amount  DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
        -- Insert the order
        INSERT INTO main.retail.orders (order_id, customer_id, amount)
        VALUES (p_order_id, p_customer_id, p_order_amount);
    
        -- Update total sales per customer
        MERGE INTO main.retail.total_sales AS t
        USING (
            SELECT
              p_customer_id  AS customer_id,
              p_order_amount AS order_amount
        ) s
          ON t.customer_id = s.customer_id
        WHEN MATCHED THEN
          UPDATE SET t.total_amount = t.total_amount + s.order_amount
        WHEN NOT MATCHED THEN
          INSERT (customer_id, total_amount)
          VALUES (s.customer_id, s.order_amount);
    END;
    
    
  3. İşlemi tanımlama

    BEGIN ATOMIC
        -- Staging batch id for this transaction
        DECLARE new_order_id STRING DEFAULT uuid();
        DECLARE v_batch_id STRING DEFAULT uuid();
    
        -- 1) Stage incoming customer and order rows
        INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
        VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
    
        -- 2) Drive final writes from staging to production via stored procedure
        FOR o AS
          SELECT
            order_id,
            customer_id,
            amount
          FROM main.retail.orders_staging
          WHERE batch_id = v_batch_id
        DO
            CALL main.retail.apply_order(
              o.order_id,
              o.customer_id,
              o.amount
            );
        END FOR;
    
        -- 3) Clean up processed staging rows
        DELETE FROM main.retail.orders_staging
        WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction
    
    

İşlemin herhangi bir bölümü başarısız olursa Databricks tüm değişiklikleri otomatik olarak geri alır.

Temizlemek

Örnek tabloları kaldırın:

DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;


DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;

Sonraki Adımlar