Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
На этой странице описаны предварительные требования и конфигурация, необходимые для выполнения запросов в режиме реального времени в структурированной потоковой передаче. Для пошагового руководства см. Руководство: Запуск рабочей нагрузки потоковой передачи в режиме реального времени. Для получения информации о режиме реального времени см. Режим реального времени в структурированной потоковой передаче.
Необходимые условия
Чтобы использовать режим реального времени, необходимо настроить вычислительные ресурсы в соответствии со следующими требованиями:
- Используйте классические вычисления. Поддерживаются выделенные и стандартные режимы доступа. Стандартный режим доступа поддерживается только для Python. Декларативные конвейеры и бессерверные кластеры Lakeflow Spark не поддерживаются.
- Используйте Databricks Runtime 16.4 LTS и более поздних версий.
- Отключите автомасштабирование.
- Отключите Photon.
- Задайте для параметра
spark.databricks.streaming.realTimeMode.enabledзначениеtrue. - Отключите точечные экземпляры, чтобы избежать прерываний.
Для рабочих нагрузок, чувствительных к задержке, с использованием определяемых пользователем функций, Databricks рекомендуется применять выделенный режим доступа. См. сведения о функциях таблиц.
Инструкции по созданию и настройке классических вычислений см. в справочнике по конфигурации вычислений.
Конфигурация запроса
Чтобы запустить запрос в режиме реального времени, необходимо включить триггер в режиме реального времени. Триггеры реального времени поддерживаются только в режиме обновления.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Оценка размеров вычислительных ресурсов
Вы можете выполнить одно задание в режиме реального времени для каждого вычислительного ресурса, если вычислительные ресурсы имеют достаточно слотов задач.
Для выполнения в режиме низкой задержки общее количество доступных слотов задач должно быть больше или равно количеству задач на всех этапах запроса.
Примеры вычислений слотов
| Тип конвейера | Конфигурация | Обязательные слоты |
|---|---|---|
| Одноэтапное без отслеживания состояния (источник Kafka + приемник) |
maxPartitions = 8 |
8 слотов |
| Двухэтапное состояние (источник Kafka + перемещение) |
maxPartitions = 8, разбиение на части = 20 |
28 слотов (8 + 20) |
| Три этапа (источник Kafka + перетасовка + перераспределение) |
maxPartitions = 8, два этапа перетасовки 20 каждый |
48 слотов (8 + 20 + 20) |
Если не задано maxPartitions, используйте количество секций в разделе Kafka.