Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Önemli
Unity Kataloğu yönetilen Delta tablolarına yazan işlemler Genel Önizleme aşamasındadır.
Unity Kataloğu tarafından yönetilen Iceberg tablolarına yazılan işlemler Özel Önizleme aşamasındadır. Bu önizlemeye katılmak için yönetilen Iceberg tabloları önizleme kayıt formunu gönderin.
İşlemler iki modu destekler: etkileşimli olmayan ve etkileşimli olmayan. Bu sayfa, her modun ne zaman kullanılacağını kapsar ve uygulama örnekleri içerir.
Gereksinimler ve işlemlere genel bakış için bkz. İşlemler. Her iki modla uygulamalı alıştırma için bkz . Öğretici: Tablolar arasında işlemleri koordine edin.
Uyarı
Birden çok deyimli, çok tablolu bir işlemde yazılan tüm tablolar şu koşulları sağlamalıdır:
- Unity Kataloğu tarafından yönetilen tablolar (Delta veya Iceberg) olun
- Katalog tarafından yönetilen işlemelerin etkinleştirilmesi
Etkileşimli olmayan işlemler
Etkileşimli olmayan işlemler, anahtar sözcüğüyle ATOMIC kullanır.
ATOMIC bileşik deyimi bloğu, tüm deyimleri tek bir atomik birim olarak çalıştırır. Herkes ya birlikte başarılı olur ya da birlikte başarısız olur.
Desteklenen işlem: Databricks Runtime 18.0 ve üzerini çalıştıran herhangi bir SQL ambarı, sunucusuz işlem veya küme .
Desteklenen söz dizimi: SQL, Scala spark.sql blokları ve PySpark spark.sql bloklarını destekler.
Uyarı
Yapılandırılmış Akış’lar içindeki forEachBatch öğesini spark.sql("BEGIN ATOMIC ... END;") çağırarak etkileşimli olmayan işlemleri kullanabilirsiniz. Ancak, Yapılandırılmış Akış denetim noktaları işlemsel olarak ilerlemez.
Sözdizimi
BEGIN ATOMIC
statement1;
statement2;
statement3;
END;
Tüm deyimler başarılı olursa Azure Databricks tüm değişiklikleri otomatik olarak kaydeder. Herhangi bir ifade başarısız olursa Azure Databricks tüm değişiklikleri otomatik olarak geri alma işlemi gerçekleştirir.
SQL düzenleyicisinde kullanma
Etkileşimli olmayan işlemleri doğrudan SQL Düzenleyicisi'nde çalıştırın. ATOMIC bileşik deyimi bloğunun tamamını seçin ve tek bir deyim olarak çalıştırın:
BEGIN ATOMIC
DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;
INSERT INTO staging_sales
SELECT * FROM raw_sales WHERE load_date = current_date();
MERGE INTO sales AS target
USING staging_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
Not defterlerinde kullanma
SQL hücrelerini veya programlı API'leri kullanarak not defterlerinde etkileşimli olmayan işlemler çalıştırın.
SQL
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
Python
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Scala
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Zamanlanmış işlerde kullanma
Etkileşimli olmayan işlemler, işleme ve geri alma işlemlerini otomatik olarak işlediği için zamanlanmış işlerde düzgün çalışır:
BEGIN ATOMIC
-- Clear previous staging data
DELETE FROM staging_daily_sales WHERE load_date = current_date();
-- Load new data
INSERT INTO staging_daily_sales
SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
FROM raw_sales
WHERE sale_date = current_date() - INTERVAL 1 DAY;
-- Validate row count (fails transaction if no data)
IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
END IF;
-- Merge into production
MERGE INTO daily_sales AS target
USING staging_daily_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
Doğrulama dahil olmak üzere herhangi bir ifade başarısız olursa, tüm işlem otomatik olarak geri alınır.
JDBC ile kullanma
Dış istemciler etkileşimli olmayan işlemler çalıştırabilir.
JDBC
String sql = """
BEGIN ATOMIC
INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
END;
""";
Statement stmt = conn.createStatement();
stmt.execute(sql);
Deyim Yürütme API'siyle kullanma
Deyim Yürütme API'sini kullanarak etkileşimli olmayan işlemler çalıştırın:
import requests
sql = """
BEGIN ATOMIC
INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""
response = requests.post(
f"{workspace_url}/api/2.0/sql/statements",
headers={"Authorization": f"Bearer {token}"},
json={
"warehouse_id": warehouse_id,
"statement": sql,
"wait_timeout": "30s"
}
)
ETL desenleri
Aşağıdaki desenler etkileşimli olmayan işlemler kullanan yaygın ETL iş akışlarını gösterir.
Aşama ve doğrulama şablonu
Bu düzen verileri hazırlama alanına yükler, veri kalitesini doğrular ve doğrulanmış kayıtları üretim tablolarıyla birleştirir:
BEGIN ATOMIC
-- Load into staging
INSERT INTO staging_customers
SELECT * FROM external_source
WHERE ingest_date = current_date();
-- Validate data quality
IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
END IF;
-- Merge validated data
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Update metadata
UPDATE etl_metadata
SET last_load_date = current_date(),
rows_processed = (SELECT COUNT(*) FROM staging_customers)
WHERE table_name = 'customers';
END;
Boyut ve bilgi tablosu deseni
Bu düzen bilgi tutarlılığını korumak için olgu tablolarını yüklemeden önce boyut tablolarını güncelleştirir:
BEGIN ATOMIC
-- Update dimension tables first
MERGE INTO dim_products AS target
USING staging_products AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO dim_customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Then load fact table with foreign key references
INSERT INTO fact_sales
SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
FROM staging_sales s
JOIN dim_products p ON s.product_id = p.product_id
JOIN dim_customers c ON s.customer_id = c.customer_id;
END;
Hata işleme
Bir BEGIN ATOMIC ... END; bloğu içinde bir deyim başarısız olduğunda, Azure Databricks yapılan tüm değişiklikleri geri alır ve bir hata mesajı verir.
Hata ayıklama ipuçları:
- Hangi deyimin başarısız olduğunu belirlemek için hata iletisini gözden geçirin.
- İfadeleri transaction blokunun dışında ayrı ayrı test edin.
- Özel hata iletileriyle başarısız olmak için kullanarak
SIGNALdoğrulama denetimleri ekleyin. - Ek bağlam için işlem geçmişini sorgulayın.
Etkileşimli işlemler
Etkileşimli işlemler size işlem sınırları üzerinde açık denetim sağlar. Bir işlemi el ile başlatır, deyimleri çalıştırır ve açıkça işler veya geri alırsınız.
Desteklenen hesaplama: Yalnızca SQL veri ambarları.
Desteklenen söz dizimi: Yalnızca SQL.
Sözdizimi
BEGIN TRANSACTION;
statement1;
statement2;
COMMIT;
-- or: ROLLBACK;
İşlemeden önce doğrulama
İşlemeden önce sonuçları doğrulamak için etkileşimli işlemleri kullanın:
BEGIN TRANSACTION;
-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();
-- Validate and commit or rollback
BEGIN
DECLARE duplicate_count INT;
SET duplicate_count = (
SELECT COUNT(*) FROM (
SELECT customer_id, COUNT(*) as cnt
FROM staging_customers
WHERE load_date = current_date()
GROUP BY customer_id
HAVING COUNT(*) > 1
)
);
IF duplicate_count > 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
ELSE
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
COMMIT;
END IF;
END;
Belirgin geri alma
Doğrulama başarısız olduğunda veya iş mantığı değişiklikleri atılması gerektiğinde işlemi geri alın:
BEGIN TRANSACTION;
UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;
-- Check if quantity would go negative
BEGIN
DECLARE new_quantity INT;
SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);
IF new_quantity < 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
ELSE
COMMIT;
END IF;
END;
JDBC ile kullanma
JDBC sürücüsü, executeUpdate() kullanarak işlemler içinde DML deyimlerini çalıştırmayı destekler. Desteklenen DML deyimlerinin listesi için bkz . Desteklenen işlemler.
JDBC istemcileri, otomatik işleme modunu devre dışı bırakarak etkileşimli işlemler kullanır:
Connection conn = DriverManager.getConnection(jdbcUrl, properties);
try {
conn.setAutoCommit(false); // Start transaction mode
Statement stmt = conn.createStatement();
stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");
conn.commit(); // Commit the transaction
} catch (SQLException e) {
conn.rollback(); // Roll back on error
throw e;
} finally {
conn.close();
}
Desteklenmeyen JDBC işlemleri
Aşağıdaki JDBC işlemleri etkileşimli işlemlerde desteklenmez:
| Kategori | Desteklenmiyor |
|---|---|
| Katalog veya şema değiştirme |
Connection.setCatalog() ve Connection.setSchema() |
| Oturum yapılandırma değişiklikleri |
Connection.setClientInfo() gibi oturum düzeyi özellikleri için TIMEZONE ve ANSI_MODE |
| Tüm DatabaseMetaData (tüm protokoller) | Tüm DatabaseMetaData.* yöntemler |
| PreparedStatement meta verileri | PreparedStatement.getMetaData() |
| Saklanan prosedürler | CALL procedure_name() |
ODBC ile kullanma
ODBC sürücüsü, işlemler içinde SQLExecute() ve SQLExecDirect() kullanarak DML deyimlerinin çalıştırılmasını destekler. Desteklenen DML deyimlerinin listesi için bkz . Desteklenen işlemler.
ODBC istemcileri, standart ODBC işlem yönetimi işlevlerini kullanarak Azure Databricks ODBC sürücüsüyle etkileşimli işlemler kullanabilir.
Desteklenmeyen ODBC işlemleri
Aşağıdaki ODBC işlemleri etkileşimli işlemlerde desteklenmez:
| Kategori | Desteklenmiyor |
|---|---|
| Tüm katalog işlevleri |
SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns |
| Bağlantı özniteliklerini ayarlama | Katalog değiştirmeleri, yalıtım düzeyi değişiklikleri ve erişim modu değişiklikleri SQLSetConnectAttr() kullanılarak yapılır. |
| SQL çevirisi | SQLNativeSql |
Python için Databricks SQL Bağlayıcısı ile kullanma
Python için Databricks SQL Bağlayıcısı, işlemlerin içinde kullanarak cursor.execute() DML deyimlerinin çalıştırılmasını destekler. Desteklenen DML deyimlerinin listesi için bkz . Desteklenen işlemler.
Python uygulamaları, autocommit=False ayarlarını yapılandırarak etkileşimli işlemler kullanabilir:
from databricks import sql
with sql.connect(
server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
http_path="sql/1.0/warehouses/abc123def456",
access_token="your-access-token",
autocommit=False
) as connection:
with connection.cursor() as cursor:
cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
connection.commit()
Desteklenmeyen Python bağlayıcısı işlemleri
Aşağıdaki Python bağlayıcısı işlemleri etkileşimli işlemlerde desteklenmez:
| Kategori | Desteklenmiyor |
|---|---|
| Tüm meta veriler |
cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns() |
Etkileşimli işlemler için sürücü sınırlamaları
Etkileşimli işlemler kullanılırken tüm sürücüler için aşağıdaki sınırlamalar geçerlidir.
Meta veri işlemleri etkileşimli işlemlerde desteklenmez. Aşağıdaki işlemler, sürücü veya protokolden bağımsız olarak bir işlem içinde başarısız olabilir:
| Sürücü/Protokol | Türü | Methods |
|---|---|---|
| JDBC | DatabaseMetaData |
getCatalogs(), getSchemas(), getTables(), getColumns(), , getTypeInfo() |
| ODBC | Katalog işlevleri |
SQLTables, SQLColumns, SQLGetTypeInfo |
| Python bağlayıcısı | Meta veri yöntemleri |
cursor.catalogs(), cursor.schemas(), cursor.tables(), cursor.columns() |
| SQL | Meta veri komutları |
SHOW TABLES, SHOW DATABASES, DESCRIBE TABLE, USE CATALOG, , USE SCHEMA |
| SQL | information_schema |
SELECT tablolar üzerinde information_schema sorgular |
tüm meta veri işlemlerini işlemlerin dışında çalıştırın.
Uyarı
Tek bir sürücü bağlantı nesnesinde birden çok iş parçacığı üzerinde işlem çalıştırmak tanımsız davranışa neden olur. Her bağlantı nesnesinde aynı anda yalnızca bir işlem çalıştırın.
Yalıtım davranışı
Etkileşimli bir işlemdeki kaydedilmemiş değişiklikler yalnızca oturumunuz tarafından görülebilir. Diğer oturumlar, tablo durumunu işleminiz başlamadan önceki haliyle görür.
Uyarı
Etkileşimli işlemler, etkileşimli olmayan işlemlerden daha muhafazakar çakışma algılama kullanır ve tablo düzeyinde (koşulsuz eklemeler dışında) çakışabilir. Satır düzeyi çakışma algılaması için etkileşimsiz işlemleri kullanın (BEGIN ATOMIC ... END;).
- Yalıtımını doğrulamak için, eğer yoksa örnek tabloyu oluşturun.
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
Aynı oturumda bir işlem başlatın ve bir değişiklik yapın:
BEGIN TRANSACTION; INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);Ayrı bir SQL Düzenleyicisi sekmesinde veya not defteri oturumunda (aynı not defterindeki yeni bir hücrede değil), tabloyu sorgulayın:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;Kaydedilmemiş değişiklik ilk oturumunuzun dışında görünmediğinden bu 0 satır döndürür.
İlk oturumunuza dönün ve değişiklikleri kaydedin.
COMMIT;İkinci oturumdan yeniden sorgula:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;İşlem tamamlandığından satır görünür.
Bu yalıtım, diğer kullanıcıların geri alınabilecek verileri okumasını engeller.
İşlem modu seçme
| Scenario | Önerilen mod |
|---|---|
| Zamanlanmış ETL görevleri | Etkileşimli olmayan— otomatik işleme veya geri alma, hata işlemeyi basitleştirir |
| Sabit deyim dizileri | Etkileşimli olmayan—daha basit söz dizimi, el ile işleme gerekmez |
| İşlemeden önce veri doğrulama | Etkileşimli— sonuçları inceleyin ve işlemeye karar verin |
| El ile denetime ihtiyaç duyan JDBC uygulamaları | Etkileşimli— standart veritabanı işlem desenleri |
Sonraki Adımlar
- Öğretici: Tablolar arasında işlemleri koordine edin
- İşlemler
- Katalog tarafından yönetilen taahhütler
- Yalıtım düzeyleri ve yazma çakışmaları
İlgili SQL kaynak
- ATOMIC bileşik deyimi (etkileşimli olmayan işlemler): Otomatik işleme ve geri alma ile birden çok SQL deyimini tek bir atomik işlem olarak çalıştırın.
- BEGIN TRANSACTION (etkileşimli işlemler):El ile işleme ve geri alma denetimiyle etkileşimli bir işlem başlatın.
- COMMIT: Etkileşimli bir işlem gerçekleştirin ve tüm değişiklikleri kalıcı hale getirin.
- ROLLBACK: Etkileşimli bir işlemi geri alın ve tüm değişiklikleri atın.