Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Row-level concurrency reduces conflicts between concurrent write operations by detecting changes at the row level and automatically resolving conflicts that occur when concurrent writes update or delete different rows in the same data file.
Requirements for row-level concurrency
Row-level concurrency is automatically enabled when all of the following requirements are met:
- Using Databricks Runtime 14.3 LTS and above.
- The source table doesn't use partitions.
- The source table has deletion vectors enabled. See Deletion vectors in Databricks.
Partitioned tables don't allow row-level concurrency. However, when deletion vectors are enabled, partitioned tables can still avoid conflicts between OPTIMIZE and write operations. See Limitations for row-level concurrency.
For Databricks Runtime versions before 14.3 LTS, see Row-level concurrency legacy behavior.
Conflict matrix with row-level concurrency
For source tables with row-level concurrency, the following table shows which pairs of write operations can conflict in each isolation level:
| Operation | INSERT (1) | UPDATE, DELETE, MERGE INTO | OPTIMIZE |
|---|---|---|---|
| INSERT | Cannot conflict | ||
| UPDATE, DELETE, MERGE INTO | Cannot conflict in WriteSerializable. Can conflict in Serializable when modifying the same row. | Can conflict when modifying the same row. | |
| OPTIMIZE | Cannot conflict | Can conflict when ZORDER BY is used. Cannot conflict otherwise. |
Can conflict when ZORDER BY is used. Cannot conflict otherwise. |
(1) All INSERT operations in this table describe append operations that don't contain subqueries that read data from the same table. INSERT operations containing subqueries that read from the same table support the same concurrency as MERGE.
Note
- Tables with identity columns don't support concurrent transactions. See Use identity columns in Delta Lake.
REORGoperations have isolation semantics identical toOPTIMIZEwhen rewriting data files. When you useREORGto apply an upgrade, table protocols change, which conflicts with all ongoing operations.
Write conflicts without row-level concurrency
For source tables without row-level concurrency, the following table shows which pairs of write operations can conflict in each isolation level:
| Operation | INSERT (1) | UPDATE, DELETE, MERGE INTO | OPTIMIZE |
|---|---|---|---|
| INSERT | Cannot conflict | ||
| UPDATE, DELETE, MERGE INTO | Cannot conflict in WriteSerializable. Can conflict in Serializable. See Avoid conflicts using partitioning. | Can conflict in Serializable and WriteSerializable. See Avoid conflicts using partitioning. | |
| OPTIMIZE | Cannot conflict | Cannot conflict in tables with deletion vectors enabled, unless ZORDER BY is used. Can conflict otherwise. |
Cannot conflict in tables with deletion vectors enabled, unless ZORDER BY is used. Can conflict otherwise. |
(1) All INSERT operations in this table describe append operations that don't contain subqueries that read data from the same table. INSERT operations containing subqueries that read from the same table support the same concurrency as MERGE.
Note
- Tables with identity columns don't support concurrent transactions. See Use identity columns in Delta Lake.
REORGoperations have isolation semantics identical toOPTIMIZEwhen rewriting data files. When you useREORGto apply an upgrade, table protocols change and conflict with all ongoing operations.
Limitations for row-level concurrency
Limitations apply for row-level concurrency. For the following operations, conflict resolution follows normal concurrency for write conflicts. See Write conflicts without row-level concurrency.
| Limitation | Description |
|---|---|
| Complex conditional clauses | Conditions on complex data types (structs, arrays, maps), non-deterministic expressions, subqueries, and correlated subqueries |
MERGE predicate requirement |
In Databricks Runtime 14.2, MERGE commands must use an explicit predicate on the target table to filter rows matching the source table |
| Performance tradeoff | Row-level conflict detection can increase total execution time. With many concurrent transactions, the writer prioritizes latency over conflict resolution |
All limitations for deletion vectors also apply. See Limitations.
Avoid conflicts using partitioning
For all cases marked "can conflict" in the conflict matrices, a conflict occurs only if the two operations affect the same set of files. To make two sets of files disjoint, partition the table by the same columns used in operation conditions.
Example:
The commands UPDATE table WHERE date > '2010-01-01' ... and DELETE table WHERE date < '2010-01-01' conflict if the table is not partitioned by date, because both can attempt to modify the same files. Partitioning the table by date avoids the conflict.
Note
Partitioning a table by a column with high cardinality can lead to performance issues due to the large number of subdirectories.
Avoid conflicts with explicit partition filters
This exception is often raised during concurrent DELETE, UPDATE, or MERGE operations that might read the same partition even when updating different partitions. Make the separation explicit in the operation condition:
// Problem: Condition can scan the entire table
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
// Solution: Add explicit partition filters
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + date + "' AND t.country = '" + country + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Conflict exceptions
When a transaction conflict occurs, you observe one of the following exceptions:
ConcurrentAppendException
This exception occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. The file additions can be caused by INSERT, DELETE, UPDATE, or MERGE operations.
With the default WriteSerializable isolation level, files added by INSERT operations that append data without reading any data don't conflict with any operation. If the isolation level is Serializable, any appends may conflict.
Important
INSERT operations can conflict in WriteSerializable mode if multiple concurrent DELETE, UPDATE, or MERGE operations might reference values appended by the INSERT operation. To avoid this:
- Ensure concurrent
DELETE,UPDATE, orMERGEoperations don't read the appended data - Have at most one
DELETE,UPDATE, orMERGEoperation that can read the appended data
ConcurrentDeleteReadException
This exception occurs when a concurrent operation deletes a file that your operation read. Common causes are DELETE, UPDATE, or MERGE operations that rewrite files.
ConcurrentDeleteDeleteException
This exception occurs when a concurrent operation deletes a file that your operation also deletes. This could be caused by two concurrent compaction operations rewriting the same files.
MetadataChangedException
This exception occurs when a concurrent transaction updates the metadata of a Delta table. Common causes are ALTER TABLE operations or writes that update the table schema.
ConcurrentTransactionException
This exception occurs if a streaming query using the same checkpoint location is started multiple times concurrently and tries to write to the Delta table at the same time. Never run two streaming queries with the same checkpoint location concurrently.
ProtocolChangedException
This exception can occur when:
- Your Delta Lake table is upgraded to a new protocol version (you might need to upgrade your Databricks Runtime)
- Multiple writers are creating or replacing a table at the same time
- Multiple writers are writing to an empty path at the same time
See Delta Lake feature compatibility and protocols.
Row-level concurrency legacy behavior
In Databricks Runtime 13.3 LTS, row-level concurrency uses legacy behavior:
- Requires deletion vectors. See Deletion vectors in Databricks.
- Tables with liquid clustering automatically enable row-level concurrency.