Структурированные контрольные точки потоковой передачи

Контрольные точки и журналы предзаписи взаимодействуют для обеспечения гарантий обработки для потоковых структурированных рабочих нагрузок. Контрольная точка отслеживает сведения, определяющие запрос, включая сведения о состоянии и обработанные записи. При удалении файлов в каталоге контрольных точек или переходе на новое расположение контрольной точки следующий запуск запроса начинается заново.

Каталог контрольной точки содержит следующее:

  • Смещения: исходные смещения обрабатываются в каждом микропакете. Это позволяет запросу возобновить работу с того места, в котором он остался без повторной обработки данных.
  • Фиксации: запись о том, какие микропакеты были зафиксированы в приемнике, обеспечивая семантику выполнения ровно один раз.
  • Состояние: для запросов с отслеживанием состояния (агрегаций, соединений поток-поток, дедупликации и пользовательских операторов transformWithStateс отслеживанием состояния) контрольная точка хранит метаданные о операторе с отслеживанием состояния, схеме состояния и контрольно-точечном содержимом хранилища, управляемым поставщиком хранилища состояний.
  • Метаданные: уникальный идентификатор запроса, используемый для идентификации запроса. Параметры конфигурации хранятся в журнале смещения.

Каждый запрос должен иметь свое расположение контрольной точки. Несколько запросов никогда не должны разделять одно местоположение.

Примечание.

В этой статье рассматриваются контрольные точки структурированной потоковой передачи для потоковых запросов. Сведения об использовании DataFrame.checkpoint() с томами Unity Catalog для усечения планов выполнения непотоковых DataFrame см. в контрольных точках DataFrame в томах.

Включение контрольных точек для запросов структурированной потоковой передачи

Перед выполнением потокового запроса необходимо указать параметр checkpointLocation, как показано в следующем примере:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Примечание.

Некоторые приемники, такие как выходные данные для display() в записных книжках и приемнике memory, автоматически создают временное расположение контрольной точки, если этот параметр не указан. Эти временные расположения контрольных точек не гарантируют отказоустойчивость или согласованность данных и могут не быть должным образом очищены. Databricks рекомендует всегда указывать расположение контрольной точки для этих приемников.

Восстановление после изменений в структурированном запросе потоковой передачи

Существуют ограничения на то, какие изменения в потоковом запросе разрешены между перезапусками из одной и той же контрольной точки.

Изменения, которые обычно требуют новой контрольной точки, включают число или тип входных источников, подписанные разделы Kafka или пути автозагрузчика, типы операций с отслеживанием состояния, схему состояния и тип приемника выходных данных.

Изменения, которые обычно безопасны, включают добавление или удаление фильтров, изменение ограничений скорости, интервалов триггеров и обновление пользовательской логики функции в пределах mapGroupsWithState (хотя семантика может измениться).

В следующем разделе описываются изменения, которые либо не разрешены, либо эффект изменения не определен, где:

  • Термин допустимый означает, что вы можете выполнить указанное изменение, но то, будет ли точно определена семантика его воздействия, зависит от запроса и изменения.
  • Термин недопустимый означает, что не следует выполнять указанное изменение, так как перезапущенный запрос, скорее всего, завершится сбоем с непредсказуемыми ошибками.
  • sdf представляет потоковый DataFrame/Dataset, созданный с помощью sparkSession.readStream.

Типы изменений в запросах структурированной потоковой передачи

  • Изменения в числе или типе источников входных данных (то есть другой источник) — это недопустимо.

  • Изменения параметров входных источников: разрешено ли это и правильно ли определяется семантика изменения, зависит от источника и запроса, включая элементы управления приемом, такие как maxFilesPerTrigger или maxOffsetsPerTrigger. Далее мы рассмотрим несколько примеров.

    • Добавление, удаление и изменение пределов скорости разрешено:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      до

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      

      Дополнительные сведения см. в разделе "Настройка размера пакета структурированной потоковой передачи" в Azure Databricks

    • Изменения в подписанных статьях и файлах обычно не разрешены, так как результаты непредсказуемы: spark.readStream.format("kafka").option("subscribe", "article") на spark.readStream.format("kafka").option("subscribe", "newarticle").

  • Изменения в интервале запуска: можно изменять запуск между добавочными партиями и временными интервалами. См. раздел "Изменение интервалов триггеров между запусками".

  • Изменения в типе приемника выходных данных — разрешены изменения между несколькими конкретными сочетаниями приемников. Это необходимо проверить отдельно для каждого случая. Далее мы рассмотрим несколько примеров.

    • Менять приемник файлов на приемник Kafka разрешено. Kafka увидит только новые данные.
    • Менять приемник Kafka на приемник файлов запрещено.
    • Разрешено изменять тип приема данных Kafka на foreach и наоборот.
  • Изменения параметров приемника выходных данных: разрешено ли это и правильно ли определяется семантика изменения, зависит от приемника и запроса. Далее мы рассмотрим несколько примеров.

    • Изменения в выходном каталоге приемника файлов запрещены: sdf.writeStream.format("parquet").option("path", "/somePath") на sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Изменения в выходном разделе разрешены: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Изменения в определенном пользователем приемнике foreach (т. е. в коде ForeachWriter) разрешены, но семантика изменения зависит от кода.
  • Изменения в операциях, подобных проекции, фильтру или сопоставлению. Некоторые варианты разрешены. Рассмотрим пример.

    • Добавление и удаление фильтров разрешено: sdf.selectExpr("a") на sdf.where(...).selectExpr("a").filter(...).
    • Изменения в проекциях с той же выходной схемой разрешены: sdf.selectExpr("stringColumn AS json").writeStream до sdf.select(to_json(...).as("json")).writeStream.
    • Изменения в проекциях с другой выходной схемой допускаются условно: sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStream разрешено только в том случае, если приемник выходных данных разрешает изменение схемы с "a" на "b".
  • Изменения в операциях с состоянием: Некоторые операции с потоковыми данными должны сохранять данные о состоянии, чтобы непрерывно обновлять результат. Структурированная потоковая передача автоматически создает контрольные точки для данных состояния в отказоустойчивом хранилище (например, DBFS, хранилище BLOB-объектов Azure) и восстанавливает их после перезапуска. Однако предполагается, что схема данных состояния остается одинаковой во время перезапуска. Это означает, что любые изменения (т. е. добавления, удаления или изменения схемы) в операции потокового запроса с сохранением состояния не допускаются между перезапусками. Представлен список операций с сохранением состояния, схема которых не должна быть изменена при перезапусках, чтобы обеспечить восстановление сохранённого состояния.

    • Агрегат потоковой передачи — например, sdf.groupBy("a").agg(...). Любое изменение числа или типа ключей группирования или агрегатов не допускается.
    • Дедупликация потоковой передачи — например, sdf.dropDuplicates("a"). Любое изменение числа или типа ключей группирования или агрегатов не допускается.
    • соединение stream-stream: например, sdf1.join(sdf2, ...) (т. е. оба входа создаются с sparkSession.readStream). Изменения в столбцах схемы или столбцах для эквисоединения не допускаются. Изменения в типе соединения (внешнем или внутреннем) запрещены. Другие изменения в условии соединения не определены.
    • Произвольная операция с отслеживанием состояния — например, sdf.groupByKey(...).mapGroupsWithState(...) или sdf.groupByKey(...).flatMapGroupsWithState(...). Любое изменение схемы определяемого пользователем состояния и тип времени ожидания не допускается. Любое изменение в определяемой пользователем функции сопоставления состояний разрешено, но семантический результат изменения зависит от пользовательской логики. Если вы действительно хотите поддерживать изменения схемы состояния, вы можете явно кодировать и декодировать сложные структуры данных состояния в байтах с помощью схемы кодирования и декодирования, поддерживающей миграцию схем. Например, если вы сохраняете состояние в виде байтов, закодированных в Avro, то вы можете изменить схему Avro-состояния между перезапусками запросов, так как это восстанавливает двоичное состояние.

Это важно

Операторы с сохранением состояния dropDuplicates() и dropDuplicatesWithinWatermark() может не перезапуститься из-за проверки совместимости схемы состояния при смене режимов доступа к вычислениям.

Изменение между режимами доступа с выделенной изоляцией и без изоляции разрешено. Допускается изменение между стандартными и бессерверными режимами доступа. Не пытайтесь меняться между другими сочетаниями режима доступа.

Чтобы избежать этой ошибки, не изменяйте режим доступа к вычислительным ресурсам для потоковых запросов, содержащих эти операторы.