Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Уровень изоляции таблицы определяет степень, до которой транзакция должна быть изолирована от изменений, выполняемых параллельными операциями. Конфликты записи в Azure Databricks зависят от уровня изоляции.
Delta Lake предоставляет гарантии транзакций ACID для операций чтения и записи. Это означает следующее.
- Несколько авторов в нескольких кластерах могут одновременно изменять раздел таблицы. Писатели видят согласованное моментальное отображение таблицы, и запись данных осуществляется в последовательном порядке.
- Читатели продолжают видеть согласованное представление моментального снимка таблицы, с которого началось задание Azure Databricks, даже если таблица была изменена во время выполнения задания.
См. Что такое гарантии ACID в Azure Databricks?.
Примечание.
Azure Databricks использует Delta Lake для всех таблиц по умолчанию. В этой статье описывается поведение Delta Lake в Azure Databricks.
Внимание
Изменения метаданных вызывают сбой всех одновременных операций записи. Эти операции включают изменения в протокол таблицы, свойства таблицы или схему данных.
Потоковое чтение завершается сбоем при обнаружении коммита, который изменяет метаданные таблицы. Чтобы сохранить поток, необходимо перезапустить его. Рекомендуемые методы см. в разделе "Производственные соображения для структурированной потоковой передачи".
Ниже приведены примеры запросов, которые изменяют метаданные:
-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);
-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;
-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));
-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);
Конфликты записи с параллелизмом на уровне строк
Параллелизм на уровне строк уменьшает конфликты между параллельными операциями записи, обнаруживая изменения на уровне строки и автоматически разрешая конфликты, возникающие при одновременном обновлении или удалении разных строк в одном файле данных.
Параллелизм на уровне строк обычно доступен в Databricks Runtime 14.2 и выше. Параллелизм на уровне строк по умолчанию поддерживается для следующих условий:
- Таблицы с включенными векторами удаления и без секционирования.
- Таблицы с жидкой кластеризацией, если вы не отключили векторы удаления.
Таблицы с секциями не поддерживают параллелизм на уровне строк, но по-прежнему могут избежать конфликтов между OPTIMIZE и всеми другими операциями записи при включении векторов удаления. См. ограничения параллелизма на уровне строк.
Для других версий среды выполнения Databricks см. раздел "Поведение параллелизма на уровне строк (предварительный просмотр, устаревшая версия)".
MERGE INTO для поддержки параллелизма на уровне строк требуется Photon в Databricks Runtime 14.2. В Databricks Runtime 14.3 LTS и более поздних версиях Фотон не требуется.
В следующей таблице описывается, какие пары операций записи могут конфликтовывать на каждом уровне изоляции с включенным параллелизмом на уровне строк.
Примечание.
Таблицы с столбцами удостоверений не поддерживают одновременные транзакции. См. «Использование идентификационных столбцов в Delta Lake».
| INSERT (1) | UPDATE, УДАЛИТЬ, MERGE INTO | OPTIMIZE | |
|---|---|---|---|
| INSERT | Не может конфликтовать | ||
| UPDATEУДАЛИТЬ MERGE INTO | Не удается разрешить конфликт в WriteSerializable. Может конфликтовать в режиме сериализуемости при изменении той же строки. См. ограничения параллелизма на уровне строк. | Может возникать конфликт при изменении одной и той же строки. См. ограничения параллелизма на уровне строк. | |
| OPTIMIZE | Не может конфликтовать | Может возникать конфликт, когда используется ZORDER BY. Нельзя подвергаться конфликту иначе. |
Может возникать конфликт, когда используется ZORDER BY. Нельзя подвергаться конфликту иначе. |
Внимание
(1) Все INSERT операции в таблицах выше описывают операции добавления, которые не считывают данные из той же таблицы перед коммитом.
INSERT операции, содержащие подзапросы, выполняющие считывание из той же таблицы, поддерживают такой же параллелизм, как и MERGE.
REORG операции имеют семантику изоляции, идентичную той, которая применяется при перезаписи файлов данных для отражения изменений, записанных в векторах удаления. При применении обновления REORG протоколы таблиц изменяются, что приводит к конфликту со всеми выполняющимися операциями.
Запись конфликтов без параллелизма на уровне строк
В следующей таблице описывается, какие пары операций записи могут конфликтовать на каждом уровне изоляции.
Таблицы не поддерживают параллелизм на уровне строк, если в них определены секции или не включены векторы удаления. Databricks Runtime 14.2 или более поздней версии требуется для параллелизма на уровне строк.
Примечание.
Таблицы с столбцами удостоверений не поддерживают одновременные транзакции. См. «Использование идентификационных столбцов в Delta Lake».
| INSERT (1) | UPDATE, УДАЛИТЬ, MERGE INTO | OPTIMIZE | |
|---|---|---|---|
| INSERT | Не может конфликтовать | ||
| UPDATEУДАЛИТЬ MERGE INTO | Не удается разрешить конфликт в WriteSerializable. Может конфликтуться в Сериализуемом режиме. См. избежать конфликтов с разделами. | Может возникать конфликт в Serializable и WriteSerializable. См. избежать конфликтов с разделами. | |
| OPTIMIZE | Не может конфликтовать | С конфликтами в таблицах с включенными векторами удаления нельзя сталкиваться, если не используется ZORDER BY. Может возникнуть конфликт в противном случае. |
С конфликтами в таблицах с включенными векторами удаления нельзя сталкиваться, если не используется ZORDER BY. Может возникнуть конфликт в противном случае. |
Внимание
(1) Все INSERT операции в таблицах выше описывают операции добавления, которые не считывают данные из той же таблицы перед коммитом.
INSERT операции, содержащие подзапросы, выполняющие считывание из той же таблицы, поддерживают такой же параллелизм, как и MERGE.
REORG операции имеют семантику изоляции, идентичную той, которая применяется при перезаписи файлов данных для отражения изменений, записанных в векторах удаления. При применении обновления REORG протоколы таблиц изменяются, что приводит к конфликту со всеми выполняющимися операциями.
Ограничения параллелизма на уровне строк
Некоторые ограничения применяются к параллелизму на уровне строк. Для следующих операций разрешение конфликтов следует обычному параллелизму для конфликтов записи в Azure Databricks. См. конфликты записи без параллелизма на уровне строк.
- Команды со сложными условными предложениями, включая следующие:
- Условия для сложных типов данных, таких как структуры, массивы или карты.
- Условия, использующие недетерминированные выражения и вложенные запросы.
- Условия, содержащие коррелированные вложенные запросы.
- В Databricks Runtime 14.2
MERGEкоманды должны использовать явный предикат в целевой таблице для фильтрации строк, соответствующих исходной таблице. Для разрешения конфликтов при слиянии фильтр проверяет только те строки, которые могут конфликтовать на основе условий фильтра при параллельных операциях.
Примечание.
Обнаружение конфликтов на уровне строк может увеличить общее время выполнения. В случае множественных параллельных транзакций записывающий отдает приоритет задержке, а не разрешению конфликтов, что может привести к их возникновению.
Все ограничения для векторов удаления также применяются. Смотрите ограничения.
Когда Delta Lake фиксирует без чтения таблицы?
Операции Delta Lake INSERT или добавления не читают состояние таблицы перед коммитом, если выполнены следующие условия:
- Логика выражается с помощью
INSERTлогики SQL или режима добавления. - Логика не содержит вложенных запросов или условных операторов, ссылающихся на таблицу, являющуюся целью операции записи.
Как и в других фиксациях, Delta Lake проверяет и обрабатывает версии таблиц в процессе фиксации с использованием метаданных в журнале транзакций, но ни одна версия таблицы фактически не считывается.
Примечание.
Многие распространенные шаблоны используют MERGE операции, чтобы вставлять данные, основываясь на условиях таблицы. Хотя эту логику можно переписать с помощью инструкций INSERT, если какое-либо условное выражение ссылается на столбец в целевой таблице, эти инструкции имеют те же ограничения параллелизма, что и MERGE.
Запись сериализуемых и сериализуемых уровней изоляции
Уровень изоляции таблицы определяет степень, до которой транзакция должна быть изолирована от изменений, выполняемых параллельными транзакциями. Delta Lake на Azure Databricks поддерживает два уровня изоляции: Serializable и WriteSerializable.
Serializable: наиболее строгий уровень изоляции. Это гарантирует, что зафиксированные операции записи и все операции чтения будут в режиме Serializable. Операции разрешены до тех пор, пока существует серийная последовательность их поочередного выполнения, которая создает тот же результат, который отображается в таблице. Для операций записи последовательность точно такая же, как и в истории таблицы.
WriteSerializable (по умолчанию): более слабый уровень изоляции, чем Serializable. Он обеспечивает сериализуемость только операций записи (но не операций чтения). Однако он все-таки сильнее, чем изоляция Snapshot. Уровень изоляции WriteSerializable используется по умолчанию, поскольку он обеспечивает оптимальный баланс согласованности и доступности данных для наиболее распространенных операций.
В этом режиме содержимое таблицы Delta может отличаться от ожидаемой последовательности операций, наблюдаемых в журнале таблиц. Это обусловлено тем, что данный режим позволяет выполнять определенные пары одновременных операций записи (скажем, операций X и Y), чтобы результат был таким же, как если бы операция Y выполнялась до операции X (то есть, сериализовалась между ними), даже несмотря на то, что в журнале отображалось бы, что операция Y зафиксирована после операции X. Чтобы запретить такое изменение порядка, задайте для уровня изоляции таблицы значение Serializable, чтобы эти транзакции завершались ошибкой.
Операции чтения всегда используют изоляцию снимка. Уровень изоляции операции записи определяет, возможно ли читателю увидеть моментальный снимок таблицы, который, согласно данным журнала, «никогда не существовал».
На уровне Serializable читатель всегда видит только те таблицы, которые соответствуют истории. Для уровня WriteSerializable читатель может видеть таблицу, которая не существует в журнале изменений.
Например, рассмотрим сценарий, когда долго выполняющаяся транзакция удаления и вставка транзакции начинаются одновременно и считывает версию v0. Транзакция вставки фиксирует первую и создает версию v1. После этого транзакция удаления пытается зафиксировать v2:
t0: deleteTxn_START
t1: insertTxn_START
t2: insertTxn_COMMIT(v1)
t3: deleteTxn_COMMIT(v2)
В этом сценарии данные, вставленные deleteTxn в него, не отображаются, insertTxn поэтому они не удаляются:
- В
Serializableизоляции не допускается фиксацияdeleteTxnи конфликт возникает. - В
WriteSerializableизоляции допускается фиксация,deleteTxnтак как транзакции могут быть упорядочены. Результирующее состояние таблицы происходит так, как будтоinsertTxnпроизошло послеdeleteTxnэтого, поэтому вставленные строки являются частью таблицы. Однако журнал разностных данных показывает порядок физической фиксации и чтоinsertTxn(версия 1) произошла доdeleteTxn(версии 2).
Дополнительные сведения о том, какие типы операций могут конфликтовать друг с другом на каждом уровне изоляции и возможных ошибках, см. в разделе "Избегайте конфликтов" с помощью условий секционирования и разрознения команд.
Выбор уровня изоляции
Уровень изоляции задается с помощью команды ALTER TABLE.
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)
где <level-name> обозначает Serializable или WriteSerializable.
Например, чтобы изменить уровень изоляции со стандартного WriteSerializable на Serializable, выполните:
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
Избегайте конфликтов с помощью сегментации и раздельных условий команд
Во всех случаях, отмеченных как "может конфликтовать", возникновение конфликта двух операций зависит от того, работают ли они с одним и тем же набором файлов. Можно разделить эти два набора файлов путем секционирования таблицы на те же столбцы, которые используются в условиях операций. Например, команды UPDATE table WHERE date > '2010-01-01' ... и DELETE table WHERE date < '2010-01-01' будут конфликтовать, если таблица не секционирована по дате, так как обе команды могут попытаться изменить один и тот же набор файлов. Секционирование таблицы по date позволит избежать конфликта. Поэтому секционирование таблицы в соответствии с условиями, которые часто используются в команде, может значительно сократить конфликты. Однако разбиение таблицы по столбцу с высокой кардинальностью может привести к проблемам производительности из-за большого количества подкаталогов.
Конфликтные исключения
При возникновении конфликта транзакции вы увидите одно из следующих исключений:
ConcurrentAppendException
Это исключение возникает, когда параллельная операция добавляет файлы в ту же секцию (или в любое место несекционированной таблицы), которую считывает операция. Добавление файлов может быть вызвано операциями INSERT, DELETE, UPDATE или MERGE.
С уровнем изоляции по умолчанию, файлы, добавленные слепыми операциями (т.е. операциям, которые слепо добавляют данные без чтения данных), не конфликтуют ни с одной операцией, даже если они затрагивают ту же самую секцию (или где-либо в неразделенной таблице). Если уровень изоляции установлен на Serializable, то при добавлении вслепую могут возникать конфликты.
Важное: Слепые добавления могут конфликтовать в режиме WriteSerializable, если выполняются несколько параллельных транзакций, работающих DELETE, UPDATEили MERGE операции, которые могут ссылаться на значения, вставленные слепыми добавлениями. Чтобы избежать этого конфликта, выполните одно из следующих действий.
- Убедитесь, что параллельные
DELETE,UPDATEилиMERGEоперации не считывают добавленные данные. - Иметь не более одной операции
DELETE,UPDATEилиMERGE, которое может считывать добавленные данные.
Это исключение часто возникает во время выполнения параллельных операций DELETE, UPDATE или MERGE. Хотя параллельные операции могут физически обновлять разные каталоги секций, одна из них может считывать секцию, которую в этот момент обновляет другая операция, что приводит к конфликту. Этого можно избежать, явно указав разделение в условии операции. Рассмотрим следующий пример.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Предположим, приведенный выше код запущен параллельно для различных дат или стран. Так как каждое задание работает над независимым разделом в целевой таблице Delta, вы не ожидаете конфликтов. Однако условие не является достаточно явным и позволяет сканировать всю таблицу, что может вызывать конфликты с параллельными операциями обновления любых других секций. Вместо этого можно изменить инструкцию, добавив в условие объединения определенную дату и страну, как показано в следующем примере.
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Теперь эту операцию можно безопасно запускать в параллельном режиме в разные даты и в разных странах.
ConcurrentDeleteReadException
Это исключение возникает, когда параллельная операция удаляет файл, который читает ваша операция. Распространенные причины: операция DELETE, UPDATE или MERGE, которая повторно записывает файлы.
ConcurrentDeleteDeleteException
Это исключение возникает, когда параллельная операция удаляет файл, который также удаляет ваша операция. Это может быть вызвано двумя одновременно выполняемыми операциями сжатия, которые выполняют повторную запись одних и тех же файлов.
MetadataChangedException
Это исключение возникает, когда параллельная транзакция обновляет метаданные таблицы Delta. Распространенные причины — операции ALTER TABLE или записи в вашу таблицу Delta, которые обновляют схему таблицы.
Исключение параллельной транзакции
Если потоковый запрос, использующий одно и то же расположение контрольной точки, одновременно запускается несколько раз и пытается одновременно выполнить запись в разностную таблицу. Никогда не допускайте, чтобы два потоковых запроса одновременно использовали одно и то же расположение контрольной точки и выполнялись одновременно.
ИсключениеИзмененияПротокола
Это исключение может возникать в следующих случаях:
- При переходе таблицы Delta на новую версию протокола. Для выполнения будущих операций может потребоваться обновить среду выполнения Databricks.
- Когда несколько людей одновременно создают или заменяют таблицу.
- Когда несколько авторов одновременно пишут в пустой путь.
Дополнительные сведения см. в разделе "Совместимость функций Delta Lake" и протоколы .
Поведение в режиме предварительного просмотра параллелизма на уровне строк (устаревший режим)
В этом разделе описано поведение предварительной версии для параллелизма на уровне строк в Databricks Runtime 14.1 и ниже. Согласованность на уровне строк всегда требует векторов удаления.
В Databricks Runtime 13.3 LTS и более поздних версиях таблицы с поддержкой жидкой кластеризации автоматически включают конкурентный доступ на уровне строк.
В Databricks Runtime 14.0 и 14.1 можно включить параллелизм на уровне строк для таблиц с векторами удаления, задав следующую конфигурацию для кластера или SparkSession:
spark.databricks.delta.rowLevelConcurrencyPreview = true
В Databricks Runtime 14.1 и ниже вычисления, не использующие Photon, поддерживают только параллелизм на уровне строк для DELETE операций.