Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Контрольные точки и журналы предзаписи взаимодействуют для обеспечения гарантий обработки для потоковых структурированных рабочих нагрузок. Контрольная точка отслеживает сведения, определяющие запрос, включая сведения о состоянии и обработанные записи. При удалении файлов в каталоге контрольных точек или переходе на новое расположение контрольной точки следующий запуск запроса начинается заново.
Каталог контрольной точки содержит следующее:
- Смещения: исходные смещения обрабатываются в каждом микропакете. Это позволяет запросу возобновить работу с того места, в котором он остался без повторной обработки данных.
- Фиксации: запись о том, какие микропакеты были зафиксированы в приемнике, обеспечивая семантику выполнения ровно один раз.
-
Состояние: для запросов с отслеживанием состояния (агрегаций, соединений поток-поток, дедупликации и пользовательских операторов
transformWithStateс отслеживанием состояния) контрольная точка хранит метаданные о операторе с отслеживанием состояния, схеме состояния и контрольно-точечном содержимом хранилища, управляемым поставщиком хранилища состояний. - Метаданные: уникальный идентификатор запроса, используемый для идентификации запроса. Параметры конфигурации хранятся в журнале смещения.
Каждый запрос должен иметь свое расположение контрольной точки. Несколько запросов никогда не должны разделять одно местоположение.
Примечание.
В этой статье рассматриваются контрольные точки структурированной потоковой передачи для потоковых запросов. Сведения об использовании DataFrame.checkpoint() с томами Unity Catalog для усечения планов выполнения непотоковых DataFrame см. в контрольных точках DataFrame в томах.
Включение контрольных точек для запросов структурированной потоковой передачи
Перед выполнением потокового запроса необходимо указать параметр checkpointLocation, как показано в следующем примере:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Примечание.
Некоторые приемники, такие как выходные данные для display() в записных книжках и приемнике memory, автоматически создают временное расположение контрольной точки, если этот параметр не указан. Эти временные расположения контрольных точек не гарантируют отказоустойчивость или согласованность данных и могут не быть должным образом очищены. Databricks рекомендует всегда указывать расположение контрольной точки для этих приемников.
Восстановление после изменений в структурированном запросе потоковой передачи
Существуют ограничения на то, какие изменения в потоковом запросе разрешены между перезапусками из одной и той же контрольной точки.
Изменения, которые обычно требуют новой контрольной точки, включают число или тип входных источников, подписанные разделы Kafka или пути автозагрузчика, типы операций с отслеживанием состояния, схему состояния и тип приемника выходных данных.
Изменения, которые обычно безопасны, включают добавление или удаление фильтров, изменение ограничений скорости, интервалов триггеров и обновление пользовательской логики функции в пределах mapGroupsWithState (хотя семантика может измениться).
В следующем разделе описываются изменения, которые либо не разрешены, либо эффект изменения не определен, где:
- Термин допустимый означает, что вы можете выполнить указанное изменение, но то, будет ли точно определена семантика его воздействия, зависит от запроса и изменения.
- Термин недопустимый означает, что не следует выполнять указанное изменение, так как перезапущенный запрос, скорее всего, завершится сбоем с непредсказуемыми ошибками.
-
sdfпредставляет потоковый DataFrame/Dataset, созданный с помощьюsparkSession.readStream.
Типы изменений в запросах структурированной потоковой передачи
Изменения в числе или типе источников входных данных (то есть другой источник) — это недопустимо.
Изменения параметров входных источников: разрешено ли это и правильно ли определяется семантика изменения, зависит от источника и запроса, включая элементы управления приемом, такие как
maxFilesPerTriggerилиmaxOffsetsPerTrigger. Далее мы рассмотрим несколько примеров.Добавление, удаление и изменение пределов скорости разрешено:
spark.readStream.format("kafka").option("subscribe", "article")до
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)Дополнительные сведения см. в разделе "Настройка размера пакета структурированной потоковой передачи" в Azure Databricks
Изменения в подписанных статьях и файлах обычно не разрешены, так как результаты непредсказуемы:
spark.readStream.format("kafka").option("subscribe", "article")наspark.readStream.format("kafka").option("subscribe", "newarticle").
Изменения в интервале запуска: можно изменять запуск между добавочными партиями и временными интервалами. См. раздел "Изменение интервалов триггеров между запусками".
Изменения в типе приемника выходных данных — разрешены изменения между несколькими конкретными сочетаниями приемников. Это необходимо проверить отдельно для каждого случая. Далее мы рассмотрим несколько примеров.
- Менять приемник файлов на приемник Kafka разрешено. Kafka увидит только новые данные.
- Менять приемник Kafka на приемник файлов запрещено.
- Разрешено изменять тип приема данных Kafka на foreach и наоборот.
Изменения параметров приемника выходных данных: разрешено ли это и правильно ли определяется семантика изменения, зависит от приемника и запроса. Далее мы рассмотрим несколько примеров.
- Изменения в выходном каталоге приемника файлов запрещены:
sdf.writeStream.format("parquet").option("path", "/somePath")наsdf.writeStream.format("parquet").option("path", "/anotherPath") - Изменения в выходном разделе разрешены:
sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2") - Изменения в определенном пользователем приемнике foreach (т. е. в коде
ForeachWriter) разрешены, но семантика изменения зависит от кода.
- Изменения в выходном каталоге приемника файлов запрещены:
Изменения в операциях, подобных проекции, фильтру или сопоставлению. Некоторые варианты разрешены. Рассмотрим пример.
- Добавление и удаление фильтров разрешено:
sdf.selectExpr("a")наsdf.where(...).selectExpr("a").filter(...). - Изменения в проекциях с той же выходной схемой разрешены:
sdf.selectExpr("stringColumn AS json").writeStreamдоsdf.select(to_json(...).as("json")).writeStream. - Изменения в проекциях с другой выходной схемой допускаются условно:
sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStreamразрешено только в том случае, если приемник выходных данных разрешает изменение схемы с"a"на"b".
- Добавление и удаление фильтров разрешено:
Изменения в операциях с состоянием: Некоторые операции с потоковыми данными должны сохранять данные о состоянии, чтобы непрерывно обновлять результат. Структурированная потоковая передача автоматически создает контрольные точки для данных состояния в отказоустойчивом хранилище (например, DBFS, хранилище BLOB-объектов Azure) и восстанавливает их после перезапуска. Однако предполагается, что схема данных состояния остается одинаковой во время перезапуска. Это означает, что любые изменения (т. е. добавления, удаления или изменения схемы) в операции потокового запроса с сохранением состояния не допускаются между перезапусками. Представлен список операций с сохранением состояния, схема которых не должна быть изменена при перезапусках, чтобы обеспечить восстановление сохранённого состояния.
-
Агрегат потоковой передачи — например,
sdf.groupBy("a").agg(...). Любое изменение числа или типа ключей группирования или агрегатов не допускается. -
Дедупликация потоковой передачи — например,
sdf.dropDuplicates("a"). Любое изменение числа или типа ключей группирования или агрегатов не допускается. -
соединение stream-stream: например,
sdf1.join(sdf2, ...)(т. е. оба входа создаются сsparkSession.readStream). Изменения в столбцах схемы или столбцах для эквисоединения не допускаются. Изменения в типе соединения (внешнем или внутреннем) запрещены. Другие изменения в условии соединения не определены. -
Произвольная операция с отслеживанием состояния — например,
sdf.groupByKey(...).mapGroupsWithState(...)илиsdf.groupByKey(...).flatMapGroupsWithState(...). Любое изменение схемы определяемого пользователем состояния и тип времени ожидания не допускается. Любое изменение в определяемой пользователем функции сопоставления состояний разрешено, но семантический результат изменения зависит от пользовательской логики. Если вы действительно хотите поддерживать изменения схемы состояния, вы можете явно кодировать и декодировать сложные структуры данных состояния в байтах с помощью схемы кодирования и декодирования, поддерживающей миграцию схем. Например, если вы сохраняете состояние в виде байтов, закодированных в Avro, то вы можете изменить схему Avro-состояния между перезапусками запросов, так как это восстанавливает двоичное состояние.
-
Агрегат потоковой передачи — например,
Это важно
Операторы с сохранением состояния dropDuplicates() и dropDuplicatesWithinWatermark() может не перезапуститься из-за проверки совместимости схемы состояния при смене режимов доступа к вычислениям.
Изменение между режимами доступа с выделенной изоляцией и без изоляции разрешено. Допускается изменение между стандартными и бессерверными режимами доступа. Не пытайтесь меняться между другими сочетаниями режима доступа.
Чтобы избежать этой ошибки, не изменяйте режим доступа к вычислительным ресурсам для потоковых запросов, содержащих эти операторы.