Isolation levels and write conflicts on Azure Databricks
The isolation level of a table defines the degree to which a transaction must be isolated from modifications made by concurrent operations. Write conflicts on Azure Databricks depend on the isolation level.
Delta Lake provides ACID transaction guarantees between reads and writes. This means that:
- Multiple writers across multiple clusters can simultaneously modify a table partition. Writers see a consistent snapshot view of the table and writes occur in a serial order.
- Readers continue to see a consistent snapshot view of the table that the Azure Databricks job started with, even when a table is modified during a job.
Azure Databricks use Delta Lake for all tables by default. This article describes behavior for Delta Lake on Azure Databricks.
When does Delta Lake commit without reading the table?
INSERT or append operations do not read the table state before committing if the following conditions are satisfied:
- Logic is expressed using
INSERTSQL logic or append mode.
- Logic contains no subqueries or conditionals that reference the table targeted by the write operation.
As in other commits, Delta Lake validates and resolves the table versions on commit using metadata in the transaction log, but no version of the table is actually read.
Many common patterns use
MERGE operations to insert data based on table conditions. Although it might be possible to rewrite this logic using
INSERT statements, any reference to conditions of data in the target table triggers the same concurrency limitations for
Write conflicts on Azure Databricks
The following table describes which pairs of write operations can conflict in each isolation level.
|INSERT (1)||UPDATE, DELETE, MERGE INTO||OPTIMIZE|
|UPDATE, DELETE, MERGE INTO||Can conflict in Serializable, cannot conflict in WriteSerializable||Can conflict in Serializable and WriteSerializable|
|OPTIMIZE||Cannot conflict||Can conflict in Serializable and WriteSerializable||Can conflict in Serializable and WriteSerializable|
INSERT operations in the table above describe append operations that do not read any data from the same table before committing.
INSERT operations that contain subqueries reading the same table support the same concurrency as
The isolation level of a table defines the degree to which a transaction must be isolated from modifications made by concurrent transactions. Delta Lake on Azure Databricks supports two isolation levels: Serializable and WriteSerializable.
Serializable: The strongest isolation level. It ensures that committed write operations and all reads are Serializable. Operations are allowed as long as there exists a serial sequence of executing them one-at-a-time that generates the same outcome as that seen in the table. For the write operations, the serial sequence is exactly the same as that seen in the table’s history.
WriteSerializable (Default): A weaker isolation level than Serializable. It ensures only that the write operations (that is, not reads) are serializable. However, this is still stronger than Snapshot isolation. WriteSerializable is the default isolation level because it provides great balance of data consistency and availability for most common operations.
In this mode, the content of the Delta table may be different from that which is expected from the sequence of operations seen in the table history. This is because this mode allows certain pairs of concurrent writes (say, operations X and Y) to proceed such that the result would be as if Y was performed before X (that is, serializable between them) even though the history would show that Y was committed after X. To disallow this reordering, set the table isolation level to be Serializable to cause these transactions to fail.
Read operations always use snapshot isolation. The write isolation level determines whether or not it is possible for a reader to see a snapshot of a table, that according to the history, “never existed”.
For the Serializable level, a reader always sees only tables that conform to the history. For the WriteSerializable level, a reader could see a table that does not exist in the Delta log.
For example, consider txn1, a long running delete and txn2, which inserts data deleted by txn1. txn2 and txn1 complete and they are recorded in that order in the history. According to the history, the data inserted in txn2 should not exist in the table. For Serializable level, a reader would never see data inserted by txn2. However, for the WriteSerializable level, a reader could at some point see the data inserted by txn2.
For more information on which types of operations can conflict with each other in each isolation level and the possible errors, see Avoid conflicts using partitioning and disjoint command conditions.
You set the isolation level using the
ALTER TABLE command.
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)
For example, to change the isolation level from the default
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
In all cases marked “can conflict”, whether the two operations will conflict depends on whether they operate on the same set of files. You can make the two sets of files disjoint by partitioning the table by the same columns as those used in the conditions of the operations. For example, the two commands
UPDATE table WHERE date > '2010-01-01' ... and
DELETE table WHERE date < '2010-01-01' will conflict if the table is not partitioned by date, as both can attempt to modify the same set of files. Partitioning the table by
date will avoid the conflict. Therefore, partitioning a table according to the conditions commonly used on the command can reduce conflicts significantly. However, partitioning a table by a column that has high cardinality can lead to other performance issues due to the large number of subdirectories.
When a transaction conflict occurs, you will observe one of the following exceptions:
This exception occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. The file additions can be caused by
With the default isolation level of
WriteSerializable, files added by blind
INSERT operations (that is, operations that blindly append data without reading any data) do not conflict with any operation, even if they touch the same partition (or anywhere in an unpartitioned table). If the isolation level is set to
Serializable, then blind appends may conflict.
This exception is often thrown during concurrent
MERGE operations. While the concurrent operations may be physically updating different partition directories, one of them may read the same partition that the other one concurrently updates, thus causing a conflict. You can avoid this by making the separation explicit in the operation condition. Consider the following example.
// Target 'deltaTable' is partitioned by date and country deltaTable.as("t").merge( source.as("s"), "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
Suppose you run the above code concurrently for different dates or countries. Since each job is working on an independent partition on the target Delta table, you don’t expect any conflicts. However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions. Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.
// Target 'deltaTable' is partitioned by date and country deltaTable.as("t").merge( source.as("s"), "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
This operation is now safe to run concurrently on different dates and countries.
This exception occurs when a concurrent operation deleted a file that your operation read. Common causes are a
MERGE operation that rewrites files.
This exception occurs when a concurrent operation deleted a file that your operation also deletes. This could be caused by two concurrent compaction operations rewriting the same files.
This exception occurs when a concurrent transaction updates the metadata of a Delta table. Common causes are
ALTER TABLE operations or writes to your Delta table that update the schema of the table.
If a streaming query using the same checkpoint location is started multiple times concurrently and tries to write to the Delta table at the same time. You should never have two streaming queries use the same checkpoint location and run at the same time.
This exception can occur in the following cases:
- When your Delta table is upgraded to a new protocol version. For future operations to succeed you may need to upgrade your Databricks Runtime.
- When multiple writers are creating or replacing a table at the same time.
- When multiple writers are writing to an empty path at the same time.
See Table protocol versioning for more details.