Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważna
Transakcje zapisywane w tabelach Delta zarządzanych przez Unity Catalog są dostępne w wersji próbnej.
Transakcje, które zapisują do zarządzanych tabel Iceberg w Unity Catalog, są dostępne w prywatnym podglądzie. Aby dołączyć do tej wersji zapoznawczej, prześlij formularz rejestracji w wersji zapoznawczej zarządzanych tabel Iceberg.
Transakcje obsługują dwa tryby: nieinterakcyjny i interaktywny. Ta strona zawiera informacje o tym, kiedy należy używać każdego trybu i zawiera przykłady implementacji.
Aby zapoznać się z wymaganiami i omówieniem transakcji, zobacz Transakcje. Aby zapoznać się z praktycznymi ćwiczeniami w obu trybach, zobacz Samouczek: koordynowanie transakcji między tabelami.
Uwaga / Notatka
Wszystkie tabele zapisywane w transakcji obejmującej wiele instrukcji i wiele tabel muszą:
- Tabela zarządzana przez Unity Catalog (Delta lub Iceberg)
- Włącz zatwierdzenia zarządzane przez katalog
Transakcje nieinterakcyjne
Transakcje nieinterakcyjne używają skryptów SQL ze ATOMIC słowem kluczowym . Blok instrukcji złożonej ATOMIC uruchamia wszystkie instrukcje jako pojedynczą jednostkę atomową. Wszyscy razem odnoszą sukces lub wszyscy razem ponoszą porażkę.
Obsługiwane zasoby obliczeniowe: dowolny magazyn SQL,bezserwerowe zasoby obliczeniowe lub klaster z uruchomionym środowiskiem Databricks Runtime 18.0 lub nowszym.
Obsługiwana składnia: obsługuje bloki SQL, Scala spark.sql i bloki PySpark spark.sql .
Uwaga / Notatka
Transakcje bez interakcji można używać w ramach Strukturalnego przesyłania strumieniowego forEachBatch poprzez wywołanie funkcji spark.sql("BEGIN ATOMIC ... END;"). Jednak punkty kontrolne Structured Streaming nie są rozwijane transakcyjnie.
Składnia
BEGIN ATOMIC
statement1;
statement2;
statement3;
END;
Jeśli wszystkie wyrażenia się powiodą, usługa Azure Databricks automatycznie zatwierdza wszystkie zmiany. Jeśli jakakolwiek instrukcja nie powiedzie się, usługa Azure Databricks automatycznie wycofa wszystkie zmiany.
Używanie w edytorze SQL
Uruchamiaj transakcje nieinterakcyjne bezpośrednio w edytorze SQL. Wybierz cały blok instrukcji złożonej ATOMIC i uruchom go jako pojedynczą instrukcję:
BEGIN ATOMIC
DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;
INSERT INTO staging_sales
SELECT * FROM raw_sales WHERE load_date = current_date();
MERGE INTO sales AS target
USING staging_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
Używanie w laptopach
Uruchamianie transakcji nieinterakcyjnych w notesach przy użyciu komórek SQL lub programowych interfejsów API.
SQL
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
Python
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Scala
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Używanie w zaplanowanych zadaniach
Transakcje nieinterakcyjne działają dobrze w zaplanowanych zadaniach, ponieważ automatycznie obsługują zatwierdzanie i wycofywanie:
BEGIN ATOMIC
-- Clear previous staging data
DELETE FROM staging_daily_sales WHERE load_date = current_date();
-- Load new data
INSERT INTO staging_daily_sales
SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
FROM raw_sales
WHERE sale_date = current_date() - INTERVAL 1 DAY;
-- Validate row count (fails transaction if no data)
IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
END IF;
-- Merge into production
MERGE INTO daily_sales AS target
USING staging_daily_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
Jeśli jakakolwiek instrukcja nie powiedzie się, w tym asercji, cała transakcja zostanie wycofana automatycznie.
Używanie z funkcją JDBC
Klienci zewnętrzni mogą uruchamiać transakcje nieinterakcyjne.
JDBC
String sql = """
BEGIN ATOMIC
INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
END;
""";
Statement stmt = conn.createStatement();
stmt.execute(sql);
Użycie z interfejsem API do wykonywania poleceń
Uruchom transakcje nieinterakcyjne przy użyciu interfejsu API wykonywania instrukcji:
import requests
sql = """
BEGIN ATOMIC
INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""
response = requests.post(
f"{workspace_url}/api/2.0/sql/statements",
headers={"Authorization": f"Bearer {token}"},
json={
"warehouse_id": warehouse_id,
"statement": sql,
"wait_timeout": "30s"
}
)
Wzorce ETL
Poniższe wzorce przedstawiają typowe przepływy pracy ETL korzystające z transakcji nieinterakcyjnych.
Wzorzec etapowania i walidacji
Ten wzorzec ładuje dane do obszaru przejściowego, weryfikuje jakość danych i scala zweryfikowane rekordy z tabelami produkcyjnymi:
BEGIN ATOMIC
-- Load into staging
INSERT INTO staging_customers
SELECT * FROM external_source
WHERE ingest_date = current_date();
-- Validate data quality
IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
END IF;
-- Merge validated data
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Update metadata
UPDATE etl_metadata
SET last_load_date = current_date(),
rows_processed = (SELECT COUNT(*) FROM staging_customers)
WHERE table_name = 'customers';
END;
Wzorzec tabel wymiarów i faktów
Ten wzorzec aktualizuje tabele wymiarów przed załadowaniem tabel faktów w celu zachowania integralności referencyjnej:
BEGIN ATOMIC
-- Update dimension tables first
MERGE INTO dim_products AS target
USING staging_products AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO dim_customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Then load fact table with foreign key references
INSERT INTO fact_sales
SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
FROM staging_sales s
JOIN dim_products p ON s.product_id = p.product_id
JOIN dim_customers c ON s.customer_id = c.customer_id;
END;
Obsługa błędów
Gdy instrukcja w bloku BEGIN ATOMIC ... END; zakończy się niepowodzeniem, usługa Azure Databricks wycofa wszystkie zmiany i zwróci komunikat o błędzie.
Porady dotyczące debugowania:
- Przejrzyj komunikat o błędzie, aby określić, która instrukcja nie powiodła się.
- Testuj instrukcje indywidualnie poza blokiem transakcji.
- Dodaj sprawdzanie poprawności przy użyciu polecenia
SIGNAL, aby zakończyć się niepowodzeniem z niestandardowymi komunikatami o błędach. - Wykonywanie zapytań dotyczących historii transakcji w celu uzyskania dodatkowego kontekstu.
Transakcje interakcyjne
Transakcje interakcyjne zapewniają jawną kontrolę nad granicami transakcji. Należy ręcznie rozpocząć transakcję, uruchomić instrukcje i jawnie zatwierdzić lub wycofać.
Obsługiwane zasoby obliczeniowe: tylko magazyny SQL.
Obsługiwana składnia: tylko sql.
Składnia
BEGIN TRANSACTION;
statement1;
statement2;
COMMIT;
-- or: ROLLBACK;
Zweryfikuj poprawność przed zatwierdzeniem
Użyj transakcji interakcyjnych, aby zweryfikować wyniki przed zatwierdzeniem:
BEGIN TRANSACTION;
-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();
-- Validate and commit or rollback
BEGIN
DECLARE duplicate_count INT;
SET duplicate_count = (
SELECT COUNT(*) FROM (
SELECT customer_id, COUNT(*) as cnt
FROM staging_customers
WHERE load_date = current_date()
GROUP BY customer_id
HAVING COUNT(*) > 1
)
);
IF duplicate_count > 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
ELSE
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
COMMIT;
END IF;
END;
Jawne wycofywanie
Wycofywanie transakcji w przypadku niepowodzenia walidacji lub logika biznesowa wymaga odrzucenia zmian:
BEGIN TRANSACTION;
UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;
-- Check if quantity would go negative
BEGIN
DECLARE new_quantity INT;
SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);
IF new_quantity < 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
ELSE
COMMIT;
END IF;
END;
Używanie z funkcją JDBC
Sterownik JDBC obsługuje uruchamianie instrukcji DML przy użyciu executeUpdate() w ramach transakcji. Aby uzyskać listę obsługiwanych instrukcji DML, zobacz Obsługiwane operacje.
Klienci JDBC używają transakcji interakcyjnych, wyłączając tryb automatycznego zatwierdzania:
Connection conn = DriverManager.getConnection(jdbcUrl, properties);
try {
conn.setAutoCommit(false); // Start transaction mode
Statement stmt = conn.createStatement();
stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");
conn.commit(); // Commit the transaction
} catch (SQLException e) {
conn.rollback(); // Roll back on error
throw e;
} finally {
conn.close();
}
Nieobsługiwane operacje JDBC
Następujące operacje JDBC nie są obsługiwane w ramach transakcji interakcyjnych:
| Kategoria | Niewspierane |
|---|---|
| Przełączanie wykazu lub schematu |
Connection.setCatalog() i Connection.setSchema() |
| Zmiany konfiguracji sesji |
Connection.setClientInfo() dla właściwości na poziomie sesji, takich jak TIMEZONE i ANSI_MODE |
| Wszystkie metadaneBazyDanych (wszystkie protokoły) | Wszystkie DatabaseMetaData.* metody |
| Metadane PreparedStatement | PreparedStatement.getMetaData() |
| Procedury przechowywane | CALL procedure_name() |
Używanie z ODBC
Sterownik ODBC obsługuje wykonywanie instrukcji DML przy użyciu SQLExecute() i SQLExecDirect() w ramach transakcji. Aby uzyskać listę obsługiwanych instrukcji DML, zobacz Obsługiwane operacje.
Klienci ODBC mogą używać interakcyjnych transakcji ze sterownikiem ODBC usługi Azure Databricks przy użyciu standardowych funkcji zarządzania transakcjami ODBC.
Nieobsługiwane operacje ODBC
Następujące operacje ODBC nie są obsługiwane w ramach transakcji interakcyjnych:
| Kategoria | Niewspierane |
|---|---|
| Wszystkie funkcje wykazu |
SQLTables, SQLColumns, SQLStatistics, SQLSpecialColumns, SQLPrimaryKeys, SQLForeignKeys, SQLTablePrivileges, SQLColumnPrivileges, SQLProcedures, SQLProcedureColumns |
| Ustawianie atrybutów połączenia | Przełączanie katalogu, zmiany na poziomie izolacji i zmiany trybu dostępu za pomocą SQLSetConnectAttr() |
| Tłumaczenie SQL | SQLNativeSql |
Używanie z łącznikiem SQL usługi Databricks dla języka Python
Łącznik SQL usługi Databricks dla języka Python obsługuje uruchamianie instrukcji DML przy użyciu cursor.execute() w ramach transakcji. Aby uzyskać listę obsługiwanych instrukcji DML, zobacz Obsługiwane operacje.
Aplikacje języka Python mogą używać interakcyjnych transakcji z łącznikiem SQL usługi Databricks dla języka Python, ustawiając autocommit=False:
from databricks import sql
with sql.connect(
server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
http_path="sql/1.0/warehouses/abc123def456",
access_token="your-access-token",
autocommit=False
) as connection:
with connection.cursor() as cursor:
cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
connection.commit()
Nieobsługiwane operacje łącznika języka Python
Następujące operacje łącznika języka Python nie są obsługiwane w ramach transakcji interaktywnych:
| Kategoria | Niewspierane |
|---|---|
| Wszystkie metadane |
cursor.catalogs(), , cursor.schemas(), , cursor.tables()cursor.columns() |
Ograniczenia sterowników dotyczące transakcji interakcyjnych
Następujące ograniczenia dotyczą wszystkich sterowników podczas korzystania z transakcji interaktywnych.
Operacje metadanych nie są obsługiwane wewnątrz transakcji interaktywnych. Następujące operacje mogą zakończyć się niepowodzeniem w ramach transakcji niezależnie od sterownika lub protokołu:
| Sterownik/protokół | Typ | Methods |
|---|---|---|
| JDBC | DatabaseMetaData |
getCatalogs(), , getSchemas(), getTables(), , getColumns()getTypeInfo() |
| ODBC | Funkcje wykazu |
SQLTables
SQLColumns
SQLGetTypeInfo
|
| Łącznik języka Python | Metody metadanych |
cursor.catalogs(), , cursor.schemas(), , cursor.tables()cursor.columns() |
| SQL | Polecenia metadanych |
SHOW TABLES, , SHOW DATABASES, DESCRIBE TABLE, , USE CATALOGUSE SCHEMA |
| SQL | information_schema |
SELECT zapytania względem information_schema tabel |
Uruchom wszystkie operacje metadanych poza transakcjami.
Ostrzeżenie
Uruchamianie transakcji w wielu wątkach w jednym obiekcie połączenia sterownika prowadzi do niezdefiniowanego zachowania. Uruchom tylko jedną transakcję jednocześnie dla każdego obiektu połączenia.
Zachowanie izolacji
Niezatwierdzone zmiany w transakcji interakcyjnej są widoczne tylko dla sesji. Inne sesje widzą stan tabeli takim, jakim był przed rozpoczęciem transakcji.
Uwaga / Notatka
Transakcje interakcyjne używają bardziej konserwatywnego wykrywania konfliktów niż transakcje nieinterakcyjne i mogą powodować konflikty na poziomie tabeli (z wyjątkiem bezwarunkowych dołączań). W przypadku wykrywania konfliktów na poziomie wiersza użyj transakcji nieinterakcyjnych (BEGIN ATOMIC ... END;).
- Aby zweryfikować izolację, utwórz przykładową tabelę, jeśli nie istnieje:
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
W tej samej sesji uruchom transakcję i wprowadź zmianę:
BEGIN TRANSACTION; INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);W osobnej karcie edytora SQL lub sesji notesu (a nie nowej komórki w tym samym notesie) wykonaj zapytanie dotyczące tabeli:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;Spowoduje to zwrócenie 0 wierszy, ponieważ niezatwierdzona zmiana nie jest widoczna poza Twoją pierwszą sesją.
Wróć do pierwszej sesji i zatwierdź:
COMMIT;Ponownie wykonaj zapytanie z drugiej sesji:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;Wiersz jest widoczny, ponieważ transakcja została zatwierdzona.
Ta izolacja uniemożliwia innym użytkownikom odczytywanie danych, które mogą zostać wycofane.
Wybieranie trybu transakcji
| Scenario | Tryb zalecany |
|---|---|
| Zaplanowane zadania ETL | Nieinterakcyjne — automatyczne zatwierdzanie lub wycofywanie upraszcza obsługę błędów |
| Stałe sekwencje instrukcji | Nieinterakcyjne — prostsza składnia, bez konieczności ręcznego zatwierdzania |
| Walidacja danych przed zatwierdzeniem | Interakcyjne — sprawdzanie wyników i podejmowanie decyzji o zatwierdzeniu |
| Aplikacje JDBC wymagające ręcznej kontroli | Interaktywne — standardowe wzorce transakcji bazy danych |
Następne kroki
- Samouczek: koordynowanie transakcji między tabelami
- Transakcje
- Zatwierdzenia zarządzane przez katalog
- Poziomy izolacji i konflikty zapisu
Powiązane odwołanie SQL
- INSTRUKCJE SKŁADNIOWE ATOMIC (transakcje bez interakcji): Uruchamiaj wiele instrukcji SQL jako pojedynczą transakcję atomową z automatycznym zatwierdzeniem i wycofywaniem.
- BEGIN TRANSACTION (transakcje interakcyjne): Rozpocznij interakcyjną transakcję z ręczną kontrolą zatwierdzania i wycofywania.
- ZATWIERDZENIE: zatwierdź transakcję interaktywną i wprowadź wszystkie zmiany trwałe.
- WYCOFYWANIE: Wycofywanie transakcji interakcyjnej i odrzucanie wszystkich zmian.