Row-level concurrency

Row-level concurrency reduces conflicts between concurrent write operations by detecting changes at the row level and automatically resolving conflicts that occur when concurrent writes update or delete different rows in the same data file.

Requirements for row-level concurrency

Row-level concurrency is automatically enabled when all of the following requirements are met:

  • Using Databricks Runtime 14.3 LTS and above.
  • The source table doesn't use partitions.
  • The source table has deletion vectors enabled. See Deletion vectors in Databricks.

Partitioned tables don't allow row-level concurrency. However, when deletion vectors are enabled, partitioned tables can still avoid conflicts between OPTIMIZE and write operations. See Limitations for row-level concurrency.

For Databricks Runtime versions before 14.3 LTS, see Row-level concurrency legacy behavior.

Conflict matrix with row-level concurrency

For source tables with row-level concurrency, the following table shows which pairs of write operations can conflict in each isolation level:

Operation INSERT (1) UPDATE, DELETE, MERGE INTO OPTIMIZE
INSERT Cannot conflict
UPDATE, DELETE, MERGE INTO Cannot conflict in WriteSerializable. Can conflict in Serializable when modifying the same row. Can conflict when modifying the same row.
OPTIMIZE Cannot conflict Can conflict when ZORDER BY is used. Cannot conflict otherwise. Can conflict when ZORDER BY is used. Cannot conflict otherwise.

(1) All INSERT operations in this table describe append operations that don't contain subqueries that read data from the same table. INSERT operations containing subqueries that read from the same table support the same concurrency as MERGE.

Note

  • Tables with identity columns don't support concurrent transactions. See Use identity columns in Delta Lake.
  • REORG operations have isolation semantics identical to OPTIMIZE when rewriting data files. When you use REORG to apply an upgrade, table protocols change, which conflicts with all ongoing operations.

Write conflicts without row-level concurrency

For source tables without row-level concurrency, the following table shows which pairs of write operations can conflict in each isolation level:

Operation INSERT (1) UPDATE, DELETE, MERGE INTO OPTIMIZE
INSERT Cannot conflict
UPDATE, DELETE, MERGE INTO Cannot conflict in WriteSerializable. Can conflict in Serializable. See Avoid conflicts using partitioning. Can conflict in Serializable and WriteSerializable. See Avoid conflicts using partitioning.
OPTIMIZE Cannot conflict Cannot conflict in tables with deletion vectors enabled, unless ZORDER BY is used. Can conflict otherwise. Cannot conflict in tables with deletion vectors enabled, unless ZORDER BY is used. Can conflict otherwise.

(1) All INSERT operations in this table describe append operations that don't contain subqueries that read data from the same table. INSERT operations containing subqueries that read from the same table support the same concurrency as MERGE.

Note

  • Tables with identity columns don't support concurrent transactions. See Use identity columns in Delta Lake.
  • REORG operations have isolation semantics identical to OPTIMIZE when rewriting data files. When you use REORG to apply an upgrade, table protocols change and conflict with all ongoing operations.

Limitations for row-level concurrency

Limitations apply for row-level concurrency. For the following operations, conflict resolution follows normal concurrency for write conflicts. See Write conflicts without row-level concurrency.

Limitation Description
Complex conditional clauses Conditions on complex data types (structs, arrays, maps), non-deterministic expressions, subqueries, and correlated subqueries
MERGE predicate requirement In Databricks Runtime 14.2, MERGE commands must use an explicit predicate on the target table to filter rows matching the source table
Performance tradeoff Row-level conflict detection can increase total execution time. With many concurrent transactions, the writer prioritizes latency over conflict resolution

All limitations for deletion vectors also apply. See Limitations.

Avoid conflicts using partitioning

For all cases marked "can conflict" in the conflict matrices, a conflict occurs only if the two operations affect the same set of files. To make two sets of files disjoint, partition the table by the same columns used in operation conditions.

Example:

The commands UPDATE table WHERE date > '2010-01-01' ... and DELETE table WHERE date < '2010-01-01' conflict if the table is not partitioned by date, because both can attempt to modify the same files. Partitioning the table by date avoids the conflict.

Note

Partitioning a table by a column with high cardinality can lead to performance issues due to the large number of subdirectories.

Avoid conflicts with explicit partition filters

This exception is often raised during concurrent DELETE, UPDATE, or MERGE operations that might read the same partition even when updating different partitions. Make the separation explicit in the operation condition:

// Problem: Condition can scan the entire table
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

// Solution: Add explicit partition filters
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + date + "' AND t.country = '" + country + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Conflict exceptions

When a transaction conflict occurs, you observe one of the following exceptions:

ConcurrentAppendException

This exception occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. The file additions can be caused by INSERT, DELETE, UPDATE, or MERGE operations.

With the default WriteSerializable isolation level, files added by INSERT operations that append data without reading any data don't conflict with any operation. If the isolation level is Serializable, any appends may conflict.

Important

INSERT operations can conflict in WriteSerializable mode if multiple concurrent DELETE, UPDATE, or MERGE operations might reference values appended by the INSERT operation. To avoid this:

  • Ensure concurrent DELETE, UPDATE, or MERGE operations don't read the appended data
  • Have at most one DELETE, UPDATE, or MERGE operation that can read the appended data

ConcurrentDeleteReadException

This exception occurs when a concurrent operation deletes a file that your operation read. Common causes are DELETE, UPDATE, or MERGE operations that rewrite files.

ConcurrentDeleteDeleteException

This exception occurs when a concurrent operation deletes a file that your operation also deletes. This could be caused by two concurrent compaction operations rewriting the same files.

MetadataChangedException

This exception occurs when a concurrent transaction updates the metadata of a Delta table. Common causes are ALTER TABLE operations or writes that update the table schema.

ConcurrentTransactionException

This exception occurs if a streaming query using the same checkpoint location is started multiple times concurrently and tries to write to the Delta table at the same time. Never run two streaming queries with the same checkpoint location concurrently.

ProtocolChangedException

This exception can occur when:

  • Your Delta Lake table is upgraded to a new protocol version (you might need to upgrade your Databricks Runtime)
  • Multiple writers are creating or replacing a table at the same time
  • Multiple writers are writing to an empty path at the same time

See Delta Lake feature compatibility and protocols.

Row-level concurrency legacy behavior

In Databricks Runtime 13.3 LTS, row-level concurrency uses legacy behavior:

Next steps