Настройка режима реального времени

На этой странице описаны предварительные требования и конфигурация, необходимые для выполнения запросов в режиме реального времени в структурированной потоковой передаче. Для пошагового руководства см. Руководство: Запуск рабочей нагрузки потоковой передачи в режиме реального времени. Для получения информации о режиме реального времени см. Режим реального времени в структурированной потоковой передаче.

Необходимые условия

Чтобы использовать режим реального времени, необходимо настроить вычислительные ресурсы в соответствии со следующими требованиями:

  • Используйте классические вычисления. Поддерживаются выделенные и стандартные режимы доступа. Стандартный режим доступа поддерживается только для Python. Декларативные конвейеры и бессерверные кластеры Lakeflow Spark не поддерживаются.
  • Используйте Databricks Runtime 16.4 LTS и более поздних версий.
  • Отключите автомасштабирование.
  • Отключите Photon.
  • Задайте для параметра spark.databricks.streaming.realTimeMode.enabled значение true.
  • Отключите точечные экземпляры, чтобы избежать прерываний.

Для рабочих нагрузок, чувствительных к задержке, с использованием определяемых пользователем функций, Databricks рекомендуется применять выделенный режим доступа. См. сведения о функциях таблиц.

Инструкции по созданию и настройке классических вычислений см. в справочнике по конфигурации вычислений.

Конфигурация запроса

Чтобы запустить запрос в режиме реального времени, необходимо включить триггер в режиме реального времени. Триггеры реального времени поддерживаются только в режиме обновления.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Оценка размеров вычислительных ресурсов

Вы можете выполнить одно задание в режиме реального времени для каждого вычислительного ресурса, если вычислительные ресурсы имеют достаточно слотов задач.

Для выполнения в режиме низкой задержки общее количество доступных слотов задач должно быть больше или равно количеству задач на всех этапах запроса.

Примеры вычислений слотов

Тип конвейера Конфигурация Обязательные слоты
Одноэтапное без отслеживания состояния (источник Kafka + приемник) maxPartitions = 8 8 слотов
Двухэтапное состояние (источник Kafka + перемещение) maxPartitions = 8, разбиение на части = 20 28 слотов (8 + 20)
Три этапа (источник Kafka + перетасовка + перераспределение) maxPartitions = 8, два этапа перетасовки 20 каждый 48 слотов (8 + 20 + 20)

Если не задано maxPartitions, используйте количество секций в разделе Kafka.

Дополнительные ресурсы