Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Режим реального времени обеспечивает потоковую передачу сверхнизких задержек с сквозной задержкой до пяти миллисекунд, что делает его идеальным для рабочих рабочих нагрузок, таких как обнаружение мошенничества и персонализация в режиме реального времени. В этом руководстве описано, как настроить первый запрос потоковой передачи в режиме реального времени с помощью простого примера.
Концептуальные сведения о режиме реального времени, когда он используется и поддерживаемые функции, см. в режиме реального времени в структурированной потоковой передаче. Сведения о требованиях к конфигурации см. в разделе "Настройка режима реального времени".
Требования
Перед началом работы убедитесь, что у вас есть разрешения на создание классического вычислительного кластера, использующего конфигурацию, указанную в режиме реального времени настройки. Кроме того, обратитесь к администратору рабочей области, чтобы создать кластер в режиме реального времени.
Шаг 1. Создание записной книжки
Записные книжки предоставляют интерактивную среду для разработки и тестирования потоковых запросов. Эта записная книжка используется для записи запроса в режиме реального времени и непрерывного обновления результатов.
Чтобы создать записную книжку, выполните приведенные действия.
- Щелкните "Создать" на боковой панели и щелкните
Записная книжка.
- В ниспадающем меню вычислений выберите кластер в режиме реального времени.
- Выберите Python или Scala в качестве языка по умолчанию.
Шаг 2. Выполнение запроса в режиме реального времени
Скопируйте и вставьте следующий код в ячейку записной книжки и запустите ее. В этом примере используется источник скорости, который создает строки с заданной скоростью и отображает результаты в режиме реального времени.
Замечание
Функция display с realTime триггером доступна в Databricks Runtime 17.1 и выше.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
После выполнения кода вы увидите таблицу, которая обновляется в режиме реального времени при создании новых строк. В таблице отображается timestamp столбец и value столбец, который увеличивается по каждой строке.
Общие сведения о коде
Приведенный выше код демонстрирует основные компоненты потокового запроса в режиме реального времени. В следующих таблицах описываются ключевые параметры и элементы управления.
Python
| Параметр | Описание |
|---|---|
format("rate") |
Использует источник скорости, встроенный источник, который создает строки с настраиваемой скоростью. Это полезно для тестирования без внешних зависимостей. |
numPartitions |
Задает количество секций для созданных данных. |
rowsPerSecond |
Определяет, сколько строк создается в секунду. |
realTime="5 minutes" |
Включает режим реального времени. Интервал определяет частоту обновления контрольных точек запроса. Более длинные интервалы означают менее частые контрольные точки, но потенциально более длительные периоды восстановления после сбоев. |
outputMode="update" |
Для режима реального времени требуется режим вывода обновлений. |
Scala
| Параметр | Описание |
|---|---|
format("rate") |
Использует источник скорости, встроенный источник, который создает строки с настраиваемой скоростью. Это полезно для тестирования без внешних зависимостей. |
numPartitions |
Задает количество секций для созданных данных. |
rowsPerSecond |
Определяет, сколько строк создается в секунду. |
Trigger.RealTime() |
Включает режим реального времени с интервалом контрольной точки по умолчанию. Можно также указать интервал, например Trigger.RealTime("5 minutes"). |
OutputMode.Update() |
Для режима реального времени требуется режим вывода обновлений. |
Шаг 3. Проверка результатов
При выполнении запроса display функция создает таблицу, которая обновляется в режиме реального времени, так как источник скорости создает новые строки. Каждая строка содержит следующее:
- Метка времени, когда строка была создана источником данных о ставках.
- Счетчик, монотонно возрастающий и увеличивающийся с каждой новой строкой.
Таблица постоянно обновляется с минимальной задержкой, демонстрируя, как режим реального времени обрабатывает данные сразу после того, как он станет доступным. Это основное преимущество режима реального времени — возможность сразу просматривать и действовать над данными, а не ожидать пакетной обработки.
Дополнительные ресурсы
Теперь, когда вы выполнили первый запрос в режиме реального времени, изучите эти ресурсы для создания рабочих приложений потоковой передачи с помощью Kafka, Kinesis и других поддерживаемых источников: