Использование foreachBatch для записи в произвольные приемники данных
В этой статье описывается использование foreachBatch
структурированной потоковой передачи для записи выходных данных потокового запроса в источники данных, которые не имеют существующего приемника потоковой передачи.
Шаблон streamingDF.writeStream.foreachBatch(...)
кода позволяет применять пакетные функции к выходным данным каждого микропакета потокового запроса. Функции, используемые с foreachBatch
двумя параметрами:
- Кадр данных с выходными данными микропакета.
- Уникальный идентификатор микропакета.
Необходимо использовать foreachBatch
для операций слияния Delta Lake в структурированной потоковой передаче. См . upsert из потоковых запросов с помощью foreachBatch.
Применение дополнительных операций с кадрами данных
Многие операции с кадрами данных и наборами данных не поддерживаются при потоковой передаче кадров данных, так как в этих случаях Spark не поддерживает создание добавочных планов. С помощью foreachBatch()
можно применить некоторые из этих операций к каждому выходному микропакету. Например, можно использовать foreachBath()
и операцию SQL MERGE INTO
для записи выходных данных агрегатов потоковой передачи в разностную таблицу в режиме обновления. Дополнительные сведения см. в разделе MERGE INTO.
Внимание
foreachBatch()
предоставляет гарантии записи не менее одного раза. Но можно использовать значениеbatchId
, предоставленное для функции, как способ дедупликации выходных данных и получения гарантии строго однократной записи. В любом случае вам нужно будет самостоятельно принять решение о комплексной семантике.foreachBatch()
не работает с режимом непрерывной обработки, так как существенно зависит от выполнения микропакетов запроса потоковой передачи. Если данные записываются в непрерывном режиме, используйтеforeach()
.
Пустой кадр данных можно вызвать с помощью foreachBatch()
пользовательского кода, чтобы обеспечить правильную работу. Например:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Изменения foreachBatch
поведения в Databricks Runtime 14.0
В Databricks Runtime 14.0 и более поздних версиях для вычислений, настроенных в режиме общего доступа, применяются следующие изменения поведения:
print()
команды записывают выходные данные в журналы драйверов.- Невозможно получить доступ к подмодулу
dbutils.widgets
внутри функции. - Все файлы, модули или объекты, на которые ссылается функция, должны быть сериализуемыми и доступными в Spark.
Повторное использование существующих источников данных пакетной обработки
С помощью foreachBatch()
можно использовать существующие средства записи пакетных данных для приемников данных, которые, возможно, не поддерживают структурированную потоковую передачу. Вот несколько таких случаев.
Многие другие источники данных пакетной службы можно использовать из foreachBatch()
. См. статью "Подключение к источникам данных".
Запись в несколько расположений
Если необходимо записать выходные данные потокового запроса в несколько расположений, Databricks рекомендует использовать несколько записей структурированной потоковой передачи для оптимальной параллелизации и пропускной способности.
Использование foreachBatch
для записи в несколько приемников сериализует выполнение потоковой записи, что может увеличить задержку для каждого микропакета.
Если вы используете foreachBatch
для записи в несколько таблиц Delta, см . статью "Идемпотентная таблица" записывается в foreachBatch.