Использование режима реального времени в декларативных конвейерах Spark Lakeflow

Important

Режим реального времени в Lakeflow Spark Declarative Pipelines доступен в рамках общедоступной предварительной версии в Databricks Runtime 18.1.2 для канала предварительной версии.

Режим реального времени обеспечивает обработку данных со сверхнизкой задержкой, а сквозная задержка может составлять всего пять миллисекунд. Используйте режим реального времени для рабочих рабочих нагрузок, требующих немедленного реагирования на потоковую передачу данных, таких как обнаружение мошенничества и персонализация в режиме реального времени.

Режим реального времени также доступен непосредственно в структурированной потоковой передаче за пределами конвейеров. См. режим реального времени в структурированной потоковой передаче.

Как режим реального времени достигает низкой задержки

Режим реального времени отличается от стандартной непрерывной обработки тремя ключевыми способами:

  • Длительные пакеты: система обрабатывает данные по мере того, как она становится доступной в источнике в течение длительных пакетов (по умолчанию — пять минут).
  • Одновременное планирование этапов: все этапы запроса планируются одновременно. Вычислительный ресурс должен иметь достаточно доступных слотов задач, чтобы охватывать все этапы одновременно. См. размер вычислений.
  • Потоковое перемешивание: данные передаются между этапами по мере их появления, а не после завершения вышестоящего этапа перед началом последующего.

Интервал между контрольными точками (задаваемый через pipelines.trigger.interval) определяет, как часто состояние и смещения источников сохраняются в отказоустойчивом хранилище. Более длинные интервалы снижают накладные расходы на создание контрольных точек, но увеличивают время восстановления после сбоя и задерживают отправку метрик. Более короткие интервалы повышают устойчивость, но добавляют затраты.

Режим реального времени и непрерывные конвейеры

Режим реального времени — это специализированный тип непрерывного триггера. Непрерывный режим по-прежнему необходим — режим реального времени дополнительно обеспечивает оптимизации задержки на уровне потоков. Чтобы использовать режим реального времени, конвейер должен сначала выполняться в непрерывном режиме. Затем режим реального времени применяет дополнительные оптимизации на уровне потоков, чтобы добиться задержки менее секунды сверх того, что обеспечивает стандартная непрерывная обработка.

Включение режима реального времени требует трех шагов конфигурации:

  1. Задайте для конвейера непрерывный режим.
  2. Включите режим реального времени на уровне конвейера.
  3. Определите поток обновления в режиме реального времени.

Requirements

Requirement Value
Databricks Runtime 18.1.2 в канале предварительного просмотра SDP
Тип вычислений Классические вычисления или бессерверные

Настройка режима реального времени

Шаг 1. Установка конвейера в непрерывный режим

В параметрах конвейера задайте для режима конвейеразначение "Непрерывный" или задайте его в формате JSON конвейера:

{
  "continuous": true
}

Шаг 2. Включение режима реального времени на уровне конвейера

В параметрах конвейера добавьте следующий ключ в конфигурацию Spark в разделе Дополнительная > конфигурация Spark:

spark.databricks.streaming.realTimeMode.enabled = true

Вы также можете задать это в JSON конвейера:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Шаг 3. Определение потока обновления в режиме реального времени

Для режима реального времени требуется поток обновления. Используйте dp.create_sink() для определения целевого объекта для вывода, затем используйте декоратор @dp.update_flow, где pipelines.trigger имеет значение "RealTime", а target указывает на приёмник.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Параметры конфигурации уровня потока:

Parameter Обязательно По умолчанию Description
pipelines.trigger Да Установите значение "RealTime", чтобы включить режим реального времени для этого потока.
pipelines.trigger.interval Нет "5 minutes" Интервал между контрольными точками. Определяет частоту фиксации состояния и смещения. Более короткие значения улучшают возможность восстановления; более длинные значения снижают нагрузку.

Примеры кода

Kafka из Kafka в Kafka

Считывать из топика Kafka и записывать в целевой выходной объект Kafka:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Обогатить с помощью широковещательного соединения

Подключите поток Kafka к статической справочной таблице. Поддерживаются только широковещательные соединения (потоковая передача в статические). Объединения потоков не поддерживаются в режиме реального времени.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Агрегация

Подсчитывайте события по ключу с помощью groupBy с сохранением состояния. Задайте spark.sql.shuffle.partitions для сопоставления счетчика входных секций для операций с отслеживанием состояния:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Поддерживаемые источники и приемники

Connector Источник Как приемник Примечания.
Apache Kafka
AWS MSK Использует интерфейс, совместимый с Kafka.
Центры событий Azure (соединитель Kafka) Использует интерфейс, совместимый с Kafka.
Amazon Kinesis Не поддерживаются Используется только для режима EFO (расширенный Fan-Out) .
Дельта Не поддерживаются Не поддерживаются

Определение размеров вычислительных ресурсов

Вы можете запустить один конвейер в режиме реального времени для каждого вычислительного ресурса, если вычислительные ресурсы имеют достаточно слотов задач. Доступных слотов задач должно хватать для выполнения всех задач на всех этапах обработки запроса.

Тип конвейера Configuration Обязательные слоты для задач
Одноэтапное без отслеживания состояния (источник Kafka + приемник) maxPartitions = 8 8
Двухэтапное состояние (источник Kafka + перемещение) maxPartitions = 8, разбиение на части = 20 28 (8 + 20)
Трёхэтапный (источник Kafka + два шаффла) maxPartitions = 8, два этапа перетасовки 20 каждый 48 (8 + 20 + 20)

Если не задано maxPartitions, используйте количество секций в разделе Kafka.

Поддержка операторов

Категория Operator Поддерживается
Стейтлесс Выбор, проекция
UDFs Scala UDF ✓ (с ограничениями)
UDFs UDF на языке Python ✓ (с ограничениями)
Агрегация сумма, количество, максимум, минимум, среднее
Windowing Кувыркаясь, скользя
Windowing Session Не поддерживаются
Дедупликация dropDuplicates ✓ (неограниченное состояние)
Дедупликация dropDuplicatesWithinWatermark Не поддерживаются
Joins Присоединение к широковещательной таблице
Joins Присоединение потока к потоку Не поддерживаются
Обычай transformWithState ✓ (с различиями в поведении)
Обычай union ✓ (с ограничениями)
Обычай forEach Не поддерживаются
Обычай flatMapGroupsWithState Не поддерживаются
Обычай mapPartitions Не поддерживаются
Обычай forEachBatch Не поддерживаются

transformWithState в режиме реального времени

transformWithState поддерживается в режиме реального времени со следующими отличиями от микропакетной обработки:

  • handleInputRows вызывается один раз для каждой строки, а не один раз для каждого ключа в каждом пакете. Итератор inputRows выдает одно значение для каждого вызова.
  • Таймеры времени событий не поддерживаются. Таймеры времени обработки срабатывают при завершении длительно выполняющегося пакетного задания, если данные не поступили.
  • Функция transformWithStateInPandas не поддерживается.

UDF Pandas в режиме реального времени

Чтобы свести к минимуму задержку при использовании UDF pandas, установите значение spark.sql.execution.arrow.maxRecordsPerBatch в 1. Это оптимизирует задержку за счет пропускной способности. Если пропускная способность также важна, установите значение 100 или выше.

Мониторинг производительности режима реального времени

Режим StreamingQueryProgress реального времени отображает метрики задержки в поле latencies. Получить доступ к этим метрикам можно через StreamingQueryListener или проверив свойство lastProgress у потокового запроса.

Метрика Description
processingLatencyMs Время между тем, когда запись считывается потоком и когда она полностью обрабатывается потоком
sourceQueuingLatencyMs Время между моментом успешной записи записи в шину сообщений (например, временем добавления записи в журнал в Kafka) и моментом, когда она впервые считывается потоком
e2eLatencyMs Общая сквозная задержка от момента создания записи в источнике до полной обработки потоком

Каждая метрика представляется в виде перцентилей p50, p90, p95 и p99.

Ограничения

Рекомендуется использовать один поток в режиме реального времени для каждого конвейера. Допускается несколько потоков, но конкуренция за слоты задач между ними увеличивает задержку.

Полный список ограничений оператора и источника см. в разделе об ограничениях в режиме реального времени.

Дополнительные ресурсы