Поделиться через


Использование foreachBatch для записи в произвольные приемники данных

В этой статье описывается использование foreachBatch структурированной потоковой передачи для записи выходных данных потокового запроса в источники данных, которые не имеют существующего приемника потоковой передачи.

Шаблон streamingDF.writeStream.foreachBatch(...) кода позволяет применять пакетные функции к выходным данным каждого микропакета потокового запроса. Функции, используемые с foreachBatch, принимают два параметра:

  • Кадр данных с выходными данными микропакета.
  • Уникальный идентификатор микропакета.

Необходимо использовать foreachBatch для операций слияния Delta Lake в структурированной потоковой передаче. См. Upsert из потоковых запросов с использованием foreachBatch.

Применение дополнительных операций с кадрами данных

Многие операции с кадрами данных и наборами данных не поддерживаются при потоковой передаче кадров данных, так как в этих случаях Spark не поддерживает создание добавочных планов. С помощью foreachBatch() можно применить некоторые из этих операций к каждому выходному микропакету. Например, можно использовать foreachBatch() и SQL-операцию MERGE INTO, чтобы записывать выходные данные агрегатов потоковой обработки в таблицу Delta в режиме обновления. Дополнительные сведения см. в MERGE INTO.

Внимание

  • foreachBatch() предоставляет гарантии записи не менее одного раза. Однако вы можете использовать batchId, предоставленные функции в качестве способа дедупликации выходных данных и получения точной гарантии. В любом случае вам нужно будет самостоятельно принять решение о комплексной семантике.
  • foreachBatch() не работает с режимом непрерывной обработки, так как существенно зависит от выполнения микропакетов запроса потоковой передачи. Если данные записываются в непрерывном режиме, используйте foreach().
  • При использовании foreachBatch с оператором с сохранением состояния важно полностью обработать каждый пакет перед завершением обработки. См. полное потребление каждого пакетного DataFrame

Пустой кадр данных можно вызвать с помощью foreachBatch() пользовательского кода, чтобы обеспечить правильную работу. Например:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Изменения foreachBatch поведения в Databricks Runtime 14.0

В Databricks Runtime 14.0 и выше для вычислений, настроенных в стандартном режиме доступа, применяются следующие изменения поведения:

  • print() команды записывают выходные данные в журналы драйверов.
  • Невозможно получить доступ к подмодулу dbutils.widgets внутри функции.
  • Все файлы, модули или объекты, на которые ссылается функция, должны быть сериализуемыми и доступными в Spark.

Повторное использование существующих источников данных пакетной обработки

С помощью foreachBatch()можно использовать существующие средства записи пакетных данных для приемников данных, которые, возможно, не поддерживают структурированную потоковую передачу. Вот несколько таких случаев.

Многие другие источники данных пакетной службы можно использовать из foreachBatch(). См. статью "Подключение к источникам данных" и внешним службам.

Запись в несколько расположений

Если необходимо записать выходные данные потокового запроса в несколько расположений, Databricks рекомендует использовать несколько записей структурированной потоковой передачи для оптимальной параллелизации и пропускной способности.

Использование foreachBatch для записи в несколько приемников сериализует выполнение потоковой записи, что может увеличить задержку для каждого микропакета.

Если вы используете foreachBatch для записи в несколько таблиц Delta, см. раздел 'Идемпотентная запись таблиц' в foreachBatch.

Полностью использовать каждый пакетный кадр данных

При использовании операторов с отслеживанием состояния (например, с помощью dropDuplicatesWithinWatermark), каждая пакетная итерация должна обработать весь DataFrame или перезапустить запрос. Если вы не используете весь DataFrame, потоковый запрос завершится неудачей на следующей партии данных.

Это может произойти в нескольких случаях. В следующих примерах показано, как исправить запросы, которые неправильно используют кадр данных.

Намеренное использование подмножества пакета

Если вас интересует только подмножество пакета, вы можете использовать код, например, следующий.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

В этом случае batch_df.show(2) обрабатывает только первые два элемента в пакете, что ожидается, но если в пакете больше элементов, они должны быть обработаны. Следующий код использует полный кадр данных.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

do_nothing Здесь функция тихо игнорирует остальную часть DataFrame.

Обработка ошибки в пакете

При выполнении foreachBatch процесса может возникнуть ошибка. Вы можете иметь следующий код (в этом случае пример намеренно вызывает ошибку для отображения проблемы).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Обрабатывая ошибку (и незаметно игнорируя её), остальная часть пакета может не быть обработана. Существует два варианта обработки этой ситуации.

Сначала вы можете повторно инициировать ошибку, которая передаст её на слой оркестрации, чтобы повторить выполнение пакета. Это может решить ошибку, если это временная проблема, или передать ее вашей операционной команде, чтобы она попыталась вручную исправить. Для этого измените partial_func код следующим образом:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

Второй вариант, если вы хотите поймать исключение и игнорировать остальную часть пакета, — изменить код на этот.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Этот код использует do_nothing функцию для автоматического пропуска остальной части пакета.