Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Important
Режим реального времени в Lakeflow Spark Declarative Pipelines доступен в рамках общедоступной предварительной версии в Databricks Runtime 18.1.2 для канала предварительной версии.
Режим реального времени обеспечивает обработку данных со сверхнизкой задержкой, а сквозная задержка может составлять всего пять миллисекунд. Используйте режим реального времени для рабочих рабочих нагрузок, требующих немедленного реагирования на потоковую передачу данных, таких как обнаружение мошенничества и персонализация в режиме реального времени.
Режим реального времени также доступен непосредственно в структурированной потоковой передаче за пределами конвейеров. См. режим реального времени в структурированной потоковой передаче.
Как режим реального времени достигает низкой задержки
Режим реального времени отличается от стандартной непрерывной обработки тремя ключевыми способами:
- Длительные пакеты: система обрабатывает данные по мере того, как она становится доступной в источнике в течение длительных пакетов (по умолчанию — пять минут).
- Одновременное планирование этапов: все этапы запроса планируются одновременно. Вычислительный ресурс должен иметь достаточно доступных слотов задач, чтобы охватывать все этапы одновременно. См. размер вычислений.
- Потоковое перемешивание: данные передаются между этапами по мере их появления, а не после завершения вышестоящего этапа перед началом последующего.
Интервал между контрольными точками (задаваемый через pipelines.trigger.interval) определяет, как часто состояние и смещения источников сохраняются в отказоустойчивом хранилище. Более длинные интервалы снижают накладные расходы на создание контрольных точек, но увеличивают время восстановления после сбоя и задерживают отправку метрик. Более короткие интервалы повышают устойчивость, но добавляют затраты.
Режим реального времени и непрерывные конвейеры
Режим реального времени — это специализированный тип непрерывного триггера. Непрерывный режим по-прежнему необходим — режим реального времени дополнительно обеспечивает оптимизации задержки на уровне потоков. Чтобы использовать режим реального времени, конвейер должен сначала выполняться в непрерывном режиме. Затем режим реального времени применяет дополнительные оптимизации на уровне потоков, чтобы добиться задержки менее секунды сверх того, что обеспечивает стандартная непрерывная обработка.
Включение режима реального времени требует трех шагов конфигурации:
- Задайте для конвейера непрерывный режим.
- Включите режим реального времени на уровне конвейера.
- Определите поток обновления в режиме реального времени.
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | 18.1.2 в канале предварительного просмотра SDP |
| Тип вычислений | Классические вычисления или бессерверные |
Настройка режима реального времени
Шаг 1. Установка конвейера в непрерывный режим
В параметрах конвейера задайте для режима конвейеразначение "Непрерывный" или задайте его в формате JSON конвейера:
{
"continuous": true
}
Шаг 2. Включение режима реального времени на уровне конвейера
В параметрах конвейера добавьте следующий ключ в конфигурацию Spark в разделе Дополнительная > конфигурация Spark:
spark.databricks.streaming.realTimeMode.enabled = true
Вы также можете задать это в JSON конвейера:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Шаг 3. Определение потока обновления в режиме реального времени
Для режима реального времени требуется поток обновления. Используйте dp.create_sink() для определения целевого объекта для вывода, затем используйте декоратор @dp.update_flow, где pipelines.trigger имеет значение "RealTime", а target указывает на приёмник.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Параметры конфигурации уровня потока:
| Parameter | Обязательно | По умолчанию | Description |
|---|---|---|---|
pipelines.trigger |
Да | — | Установите значение "RealTime", чтобы включить режим реального времени для этого потока. |
pipelines.trigger.interval |
Нет | "5 minutes" |
Интервал между контрольными точками. Определяет частоту фиксации состояния и смещения. Более короткие значения улучшают возможность восстановления; более длинные значения снижают нагрузку. |
Примеры кода
Kafka из Kafka в Kafka
Считывать из топика Kafka и записывать в целевой выходной объект Kafka:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Обогатить с помощью широковещательного соединения
Подключите поток Kafka к статической справочной таблице. Поддерживаются только широковещательные соединения (потоковая передача в статические). Объединения потоков не поддерживаются в режиме реального времени.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Агрегация
Подсчитывайте события по ключу с помощью groupBy с сохранением состояния. Задайте spark.sql.shuffle.partitions для сопоставления счетчика входных секций для операций с отслеживанием состояния:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Поддерживаемые источники и приемники
| Connector | Источник | Как приемник | Примечания. |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Использует интерфейс, совместимый с Kafka. |
| Центры событий Azure (соединитель Kafka) | ✓ | ✓ | Использует интерфейс, совместимый с Kafka. |
| Amazon Kinesis | ✓ | Не поддерживаются | Используется только для режима EFO (расширенный Fan-Out) . |
| Дельта | Не поддерживаются | Не поддерживаются | — |
Определение размеров вычислительных ресурсов
Вы можете запустить один конвейер в режиме реального времени для каждого вычислительного ресурса, если вычислительные ресурсы имеют достаточно слотов задач. Доступных слотов задач должно хватать для выполнения всех задач на всех этапах обработки запроса.
| Тип конвейера | Configuration | Обязательные слоты для задач |
|---|---|---|
| Одноэтапное без отслеживания состояния (источник Kafka + приемник) |
maxPartitions = 8 |
8 |
| Двухэтапное состояние (источник Kafka + перемещение) |
maxPartitions = 8, разбиение на части = 20 |
28 (8 + 20) |
| Трёхэтапный (источник Kafka + два шаффла) |
maxPartitions = 8, два этапа перетасовки 20 каждый |
48 (8 + 20 + 20) |
Если не задано maxPartitions, используйте количество секций в разделе Kafka.
Поддержка операторов
| Категория | Operator | Поддерживается |
|---|---|---|
| Стейтлесс | Выбор, проекция | ✓ |
| UDFs | Scala UDF | ✓ (с ограничениями) |
| UDFs | UDF на языке Python | ✓ (с ограничениями) |
| Агрегация | сумма, количество, максимум, минимум, среднее | ✓ |
| Windowing | Кувыркаясь, скользя | ✓ |
| Windowing | Session | Не поддерживаются |
| Дедупликация | dropDuplicates |
✓ (неограниченное состояние) |
| Дедупликация | dropDuplicatesWithinWatermark |
Не поддерживаются |
| Joins | Присоединение к широковещательной таблице | ✓ |
| Joins | Присоединение потока к потоку | Не поддерживаются |
| Обычай | transformWithState |
✓ (с различиями в поведении) |
| Обычай | union |
✓ (с ограничениями) |
| Обычай | forEach |
Не поддерживаются |
| Обычай | flatMapGroupsWithState |
Не поддерживаются |
| Обычай | mapPartitions |
Не поддерживаются |
| Обычай | forEachBatch |
Не поддерживаются |
transformWithState в режиме реального времени
transformWithState поддерживается в режиме реального времени со следующими отличиями от микропакетной обработки:
-
handleInputRowsвызывается один раз для каждой строки, а не один раз для каждого ключа в каждом пакете. ИтераторinputRowsвыдает одно значение для каждого вызова. - Таймеры времени событий не поддерживаются. Таймеры времени обработки срабатывают при завершении длительно выполняющегося пакетного задания, если данные не поступили.
- Функция
transformWithStateInPandasне поддерживается.
UDF Pandas в режиме реального времени
Чтобы свести к минимуму задержку при использовании UDF pandas, установите значение spark.sql.execution.arrow.maxRecordsPerBatch в 1. Это оптимизирует задержку за счет пропускной способности. Если пропускная способность также важна, установите значение 100 или выше.
Мониторинг производительности режима реального времени
Режим StreamingQueryProgress реального времени отображает метрики задержки в поле latencies. Получить доступ к этим метрикам можно через StreamingQueryListener или проверив свойство lastProgress у потокового запроса.
| Метрика | Description |
|---|---|
processingLatencyMs |
Время между тем, когда запись считывается потоком и когда она полностью обрабатывается потоком |
sourceQueuingLatencyMs |
Время между моментом успешной записи записи в шину сообщений (например, временем добавления записи в журнал в Kafka) и моментом, когда она впервые считывается потоком |
e2eLatencyMs |
Общая сквозная задержка от момента создания записи в источнике до полной обработки потоком |
Каждая метрика представляется в виде перцентилей p50, p90, p95 и p99.
Ограничения
Рекомендуется использовать один поток в режиме реального времени для каждого конвейера. Допускается несколько потоков, но конкуренция за слоты задач между ними увеличивает задержку.
Полный список ограничений оператора и источника см. в разделе об ограничениях в режиме реального времени.