Уровни изоляции и конфликты записи в Azure Databricks
Уровень изоляции таблицы определяет степень, до которой транзакция должна быть изолирована от изменений, выполняемых параллельными операциями. Конфликты записи в Azure Databricks зависят от уровня изоляции.
Delta Lake предоставляет гарантии транзакций ACID для операций чтения и записи. Это означает следующее.
- Несколько операций записи в нескольких кластерах могут одновременно изменять секцию таблицы. Записи видят согласованное представление моментального снимка таблицы и записи выполняются в последовательном порядке.
- Читатели продолжают видеть согласованное представление моментального снимка таблицы, с которого началось задание Azure Databricks, даже если таблица была изменена во время выполнения задания.
См. сведения о гарантиях ACID в Azure Databricks?.
Примечание.
Azure Databricks использует Delta Lake для всех таблиц по умолчанию. В этой статье описывается поведение Delta Lake в Azure Databricks.
Внимание
Изменения метаданных вызывают сбой всех одновременных операций записи. Эти операции включают изменения в протокол таблицы, свойства таблицы или схему данных.
Потоковые операции чтения завершаются сбоем при обнаружении фиксации, которая изменяет метаданные таблицы. Чтобы сохранить поток, необходимо перезапустить его. Рекомендуемые методы см . в разделе "Рекомендации по рабочей среде" для структурированной потоковой передачи.
Ниже приведены примеры запросов, которые изменяют метаданные:
-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);
-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;
-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));
-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);
Конфликты записи с параллелизмом на уровне строк
Параллелизм на уровне строк уменьшает конфликты между параллельными операциями записи, обнаруживая изменения на уровне строки и автоматически разрешая конфликты, возникающие при одновременном обновлении или удалении разных строк в одном файле данных.
Параллелизм на уровне строк обычно доступен в Databricks Runtime 14.2 и выше. Параллелизм на уровне строк по умолчанию поддерживается для следующих условий:
- Таблицы с включенными векторами удаления и без секционирования.
- Таблицы с отказоустойчивой кластеризации, если вы не отключили векторы удаления.
Таблицы с секциями не поддерживают параллелизм на уровне строк, но по-прежнему могут избежать конфликтов между OPTIMIZE
всеми другими операциями записи при включении векторов удаления. См . ограничения параллелизма на уровне строк.
Сведения о других версиях среды выполнения Databricks см. в разделе "Поведение предварительной версии параллелизма на уровне строк" (устаревшая версия).
MERGE INTO
для поддержки параллелизма на уровне строк требуется Photon в Databricks Runtime 14.2. В Databricks Runtime 14.3 LTS и более поздних версиях Фотон не требуется.
В следующей таблице описывается, какие пары операций записи могут конфликтовывать на каждом уровне изоляции с включенным параллелизмом на уровне строк.
Примечание.
Таблицы с столбцами удостоверений не поддерживают одновременные транзакции. См. статью "Использование столбцов удостоверений" в Delta Lake.
INSERT (1) | UPDATE, DELETE, MERGE INTO | OPTIMIZE | |
---|---|---|---|
INSERT | Не может конфликтовать | ||
UPDATE, DELETE, MERGE INTO | Не удается конфликтуть в WriteSerializable. Может конфликтуть в Сериализуемом режиме при изменении одной строки. См . ограничения параллелизма на уровне строк. | Может конфликтуться при изменении одной строки. См . ограничения параллелизма на уровне строк. | |
OPTIMIZE; | Не может конфликтовать | Может конфликтуться при ZORDER BY использовании. Не удается конфликтуть в противном случае. |
Может конфликтуться при ZORDER BY использовании. Не удается конфликтуть в противном случае. |
Внимание
(1) Все INSERT
операции в таблицах выше описывают операции добавления, которые не считывают данные из одной таблицы перед фиксацией. INSERT
операции, содержащие вложенные запросы, считывающие ту же таблицу, поддерживают ту же параллельность, что MERGE
и .
REORG
операции имеют семантику изоляции, идентичную OPTIMIZE
перезаписи файлов данных для отражения изменений, записанных в векторах удаления. При применении REORG
обновления протоколы таблиц изменяются, что конфликтует со всеми текущими операциями.
Запись конфликтов без параллелизма на уровне строк
В следующей таблице описывается, какие пары операций записи могут конфликтовать на каждом уровне изоляции.
Таблицы не поддерживают параллелизм на уровне строк, если в них определены секции или не включены векторы удаления. Databricks Runtime 14.2 или более поздней версии требуется для параллелизма на уровне строк.
Примечание.
Таблицы с столбцами удостоверений не поддерживают одновременные транзакции. См. статью "Использование столбцов удостоверений" в Delta Lake.
INSERT (1) | UPDATE, DELETE, MERGE INTO | OPTIMIZE | |
---|---|---|---|
INSERT | Не может конфликтовать | ||
UPDATE, DELETE, MERGE INTO | Не удается конфликтуть в WriteSerializable. Может конфликтуться в Сериализуемом режиме. См . разделы избежать конфликтов. | Может конфликтовывать в сериализуемой и writeSerializable. См . разделы избежать конфликтов. | |
OPTIMIZE; | Не может конфликтовать | Не удается конфликтуть с таблицами с включенными векторами удаления, если ZORDER BY не используется. Может конфликтуть в противном случае. |
Не удается конфликтуть с таблицами с включенными векторами удаления, если ZORDER BY не используется. Может конфликтуть в противном случае. |
Внимание
(1) Все INSERT
операции в таблицах выше описывают операции добавления, которые не считывают данные из одной таблицы перед фиксацией. INSERT
операции, содержащие вложенные запросы, считывающие ту же таблицу, поддерживают ту же параллельность, что MERGE
и .
REORG
операции имеют семантику изоляции, идентичную OPTIMIZE
перезаписи файлов данных для отражения изменений, записанных в векторах удаления. При применении REORG
обновления протоколы таблиц изменяются, что конфликтует со всеми текущими операциями.
Ограничения параллелизма на уровне строк
Некоторые ограничения применяются к параллелизму на уровне строк. Для следующих операций разрешение конфликтов следует обычному параллелизму для конфликтов записи в Azure Databricks. См. конфликты записи без параллелизма на уровне строк.
- Команды со сложными условными предложениями, включая следующие:
- Условия для сложных типов данных, таких как структуры, массивы или карты.
- Условия, использующие недетерминированные выражения и вложенные запросы.
- Условия, содержащие коррелированные вложенные запросы.
- В Databricks Runtime 14.2
MERGE
команды должны использовать явный предикат в целевой таблице для фильтрации строк, соответствующих исходной таблице. Для разрешения слиянием фильтр проверяет только строки, которые могут конфликтовать на основе условий фильтра в параллельных операциях.
Примечание.
Обнаружение конфликтов на уровне строк может увеличить общее время выполнения. В случае многих параллельных транзакций модуль записи определяет задержку по разрешению конфликтов и конфликтам.
Все ограничения для векторов удаления также применяются. См . ограничения.
Когда Delta Lake фиксирует без чтения таблицы?
Операции Delta Lake INSERT
или добавления не считывают состояние таблицы перед фиксацией, если выполнены следующие условия:
- Логика выражается с помощью
INSERT
логики SQL или режима добавления. - Логика не содержит вложенных запросов или условных условий, ссылающихся на таблицу, предназначенную операцией записи.
Как и в других фиксациях, Delta Lake проверяет и разрешает версии таблиц при фиксации с помощью метаданных в журнале транзакций, но версия таблицы на самом деле не считывается.
Примечание.
Многие распространенные шаблоны используют MERGE
операции для вставки данных на основе условий таблицы. Хотя эту логику можно переписать с помощью INSERT
инструкций, если любое условное выражение ссылается на столбец в целевой таблице, эти инструкции имеют те же ограничения параллелизма, что MERGE
и .
Запись сериализуемых и сериализуемых уровней изоляции
Уровень изоляции таблицы определяет степень, до которой транзакция должна быть изолирована от изменений, выполняемых параллельными транзакциями. Delta Lake на Azure Databricks поддерживает два уровня изоляции: Serializable и WriteSerializable.
Serializable: наиболее надежный уровень изоляции. Это гарантирует, что зафиксированные операции записи и все операции чтения будут иметь уровень Serializable. Операции разрешены до тех пор, пока существует серийная последовательность их поочередного выполнения, которая создает тот же результат, который отображается в таблице. Для операций записи серийная последовательность идентична показанной в журнале таблицы.
WriteSerializable (по умолчанию): более слабый уровень изоляции, чем Serializable. Он обеспечивает сериализуемость только операций записи (но не операций чтения). Однако он все-таки более надежен, чем изоляция уровня Snapshot. Уровень изоляции WriteSerializable используется по умолчанию, поскольку он обеспечивает оптимальный баланс согласованности и доступности данных для наиболее распространенных операций.
В этом режиме содержимое таблицы Delta может отличаться от ожидаемой последовательности операций, наблюдаемых в журнале таблиц. Это обусловлено тем, что данный режим позволяет выполнять определенные пары одновременных операций записи (скажем, операций X и Y), чтобы результат был таким же, как если бы операция Y выполнялась до операции X (то есть, сериализовалась между ними), даже несмотря на то, что в журнале отображалось бы, что операция Y зафиксирована после операции X. Чтобы запретить такое изменение порядка, задайте для уровня изоляции таблицы значение Serializable, чтобы эти транзакции завершались ошибкой.
Операции чтения всегда используют изоляцию Snapshot. Уровень изоляции операции записи определяет, может ли читатель просматривать моментальный снимок таблицы, который «никогда не существовал» по данным журнала.
Что касается уровня Serializable, читатель всегда видит только те таблицы, которые соответствуют данным журнала. Для уровня WriteSerializable читатель видел бы таблицу, которая не существует в журнале изменений.
Например, рассмотрим txn1, длительно выполняемая операция удаления и txn2, которое вставляет данные, удаленные операцией txn1. txn2 и txn1 завершены, и они записываются в журнал в таком порядке. Согласно журналу данные, вставленные в txn2, не должны существовать в таблице. Что касается уровня Serializable, читатель никогда не будет видеть данные, вставленные операцией txn2. Однако, на уровне WriteSerializable читатель может в определенный момент просмотреть данные, вставленные операцией txn2.
Дополнительные сведения о том, какие типы операций могут конфликтовать друг с другом на каждом уровне изоляции и возможных ошибках, см. в разделе "Избегайте конфликтов" с помощью условий секционирования и разрознения команд.
Выбор уровня изоляции
Уровень изоляции задается с помощью команды ALTER TABLE
.
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)
где <level-name>
обозначает Serializable
или WriteSerializable
.
Например, чтобы изменить уровень изоляции со стандартного WriteSerializable
на Serializable
, выполните:
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
Избегайте конфликтов с помощью условий секционирования и разбиения команд
Во всех случаях, отмеченных как "может конфликтовать", возникновение конфликта двух операций зависит от того, работают ли они с одним и тем же набором файлов. Можно разделить эти два набора файлов путем секционирования таблицы на те же столбцы, которые используются в условиях операций. Например, команды UPDATE table WHERE date > '2010-01-01' ...
и DELETE table WHERE date < '2010-01-01'
будут конфликтовать, если таблица не секционирована по дате, так как обе команды могут попытаться изменить один и тот же набор файлов. Секционирование таблицы по date
позволит избежать конфликта. Поэтому секционирование таблицы в соответствии с условиями, которые часто используются в команде, может значительно сократить конфликты. Однако секционирование таблицы по столбцу с высоким кратностью может привести к другим проблемам производительности из-за большого количества подкаталогов.
Исключения конфликтов
При возникновении конфликта транзакции отобразится одно из следующих исключений:
ConcurrentAppendException
Это исключение возникает, когда параллельная операция добавляет файлы в ту же секцию (или в любое место несекционированной таблицы), которую считывает операция. Добавление файлов может быть вызвано операциями INSERT
, DELETE
, UPDATE
или MERGE
.
С уровнем WriteSerializable
изоляции по умолчанию файлы, добавленные в слепые INSERT
операции (т. е. операции, которые слепо добавляют данные без чтения данных) не конфликтуют с какой-либо операцией, даже если они касаются одной секции (или где-либо в непартиментной таблице). Если уровень изоляции имеет значение Serializable
, то при добавлении вслепую могут возникать конфликты.
Это исключение часто возникает во время выполнения параллельных операций DELETE
, UPDATE
или MERGE
. Хотя параллельные операции могут физически обновлять разные каталоги секций, одна из них может считывать секцию, которую в этот момент обновляет другая операция, что приводит к конфликту. Этого можно избежать, явно указав разделение в условии операции. Рассмотрим следующий пример.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Предположим, приведенный выше код запущен параллельно для различных дат или стран. Поскольку каждое задание работает в отдельной секции целевой разностной таблицы, конфликтов не должно быть. Однако условие не является достаточно явным и позволяет сканировать всю таблицу, что может вызывать конфликты с параллельными операциями обновления любых других секций. Вместо этого можно изменить инструкцию, добавив в условие объединения определенную дату и страну, как показано в следующем примере.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Теперь эту операцию можно безопасно запускать в параллельном режиме в разные даты и в разных странах.
ConcurrentDeleteReadException
Это исключение возникает, когда параллельная операция удаляет файл, который читает ваша операция. Распространенные причины: операция DELETE
, UPDATE
или MERGE
, которая повторно записывает файлы.
ConcurrentDeleteDeleteException
Это исключение возникает, когда параллельная операция удаляет файл, который также удаляет ваша операция. Это может быть вызвано двумя одновременно выполняемыми операциями сжатия, которые выполняют повторную запись одних и тех же файлов.
MetadataChangedException
Это исключение возникает, когда параллельная транзакция обновляет метаданные разностной таблицы. Распространенные причины — операции ALTER TABLE
или операции записи в разностную таблицу, которые обновляют схему таблицы.
ConcurrentTransactionException
Если потоковый запрос, использующий одно и то же расположение контрольной точки, одновременно запускается несколько раз и пытается одновременно выполнить запись в разностную таблицу. Никогда не допускайте, чтобы два потоковых запроса одновременно использовали одно и то же расположение контрольной точки и выполнялись одновременно.
ProtocolChangedException
Это исключение может возникать в следующих случаях:
- При обновлении таблицы Delta до новой версии протокола. Для выполнения будущих операций может потребоваться обновить среду выполнения Databricks.
- Когда несколько операций записи одновременно создают или заменяют таблицу.
- Когда несколько операций записи одновременно выполняют запись по пути в свободном месте накопителя.
Дополнительные сведения см. в статье о том, как Azure Databricks управляет совместимостью функций Delta Lake?
Поведение предварительной версии параллелизма на уровне строк (устаревшая версия)
В этом разделе описано поведение предварительной версии для параллелизма на уровне строк в Databricks Runtime 14.1 и ниже. Параллелизм на уровне строк всегда требует векторов удаления.
В Databricks Runtime 13.3 LTS и более поздних версиях таблицы с поддержкой отказоустойчивой кластеризации автоматически включают параллелизм на уровне строк.
В Databricks Runtime 14.0 и 14.1 можно включить параллелизм на уровне строк для таблиц с векторами удаления, задав следующую конфигурацию для кластера или SparkSession:
spark.databricks.delta.rowLevelConcurrencyPreview = true
В Databricks Runtime 14.1 и ниже вычисления, отличные от Photon, поддерживают только параллелизм на уровне строк для DELETE
операций.