Поделиться через


Рекомендации по лучшим практикам для декларативных конвейеров Lakeflow Spark

На этой странице описаны рекомендуемые шаблоны для проектирования, сборки и эксплуатации конвейеров с помощью декларативных конвейеров Lakeflow Spark. Применяйте эти рекомендации при запуске нового конвейера или улучшении существующего.

Выбор подходящего типа набора данных

Декларативные конвейеры Spark Lakeflow предлагают три типа наборов данных: таблицы потоковой передачи, материализованные представления и временные представления. Выбор подходящего типа для каждого слоя конвейера позволяет избежать ненужных затрат на вычислительные ресурсы и делает ваш код легче для понимания.

Таблицы потоковой передачи — это правильный выбор для приема данных и преобразования потоковой передачи с низкой задержкой. Каждая входная строка считывается и обрабатывается только один раз, что делает их идеальными для нагрузок, предназначенных только для дополнения, больших объемов данных и обработки на основе событий из облачного хранилища или шин сообщений.

Материализованные представления подходят для сложных преобразований и аналитических запросов. Их результаты предварительно вычисляются и сохраняются в актуальном состоянии с помощью добавочного обновления, поэтому запросы к ним быстры. Невозможно напрямую изменить данные в материализованном представлении— определение запроса управляет выходными данными.

Временные представления — это представления, относящиеся к области действия конвейера, которые упорядочивают вашу логику преобразования данных без их материализации в хранилище. Используйте их для промежуточных шагов, которые не нуждаются в собственной таблице.

В следующей таблице приведены сведения об использовании каждого типа:

Сценарий использования Рекомендуемый тип Причина
Загрузка из облачного хранилища или шины сообщений Потоковая таблица Обрабатывает каждую запись единожды; оптимизирован для обработки больших объемов и рабочих нагрузок, ориентированных на добавление.
Потоки CDC (вставки, обновления, удаления) Потоковая таблица Используется в качестве целевого объекта APPLY CHANGES INTO для упорядоченной загрузки данных CDC с дедупликацией.
Сложные агрегаты и соединения Материализованное представление Постепенное обновление; избегает полной повторной вычисления для каждого обновления.
Ускорение запросов на панели мониторинга Материализованное представление Предварительно вычисленные результаты ускоряют выполнение запросов по сравнению с необработанными таблицами.
Промежуточные преобразования (без последующих обработчиков данных) Временное представление Упорядочивает логику конвейера без возникновения затрат на хранение.

Дополнительные сведения см. в таблицах потоковой передачи, материализованных представлениях и концепциях декларативных конвейеров Spark Lakeflow.

Используйте декларативный CDC вместо императивного MERGE

Для реализации отслеживания измененных данных (CDC) с императивными инструкциями SQL MERGE требуется значительный пользовательский код для обработки порядка событий, дедупликации, частичных обновлений и эволюции схемы. Каждое из этих проблем должно быть решено независимо, и результирующий код трудно поддерживать и тестировать.

Декларативные конвейеры Spark Lakeflow предоставляют APPLY CHANGES INTO инструкцию (SQL) и apply_changes() функцию (Python), которая обрабатывает упорядочивание, дедупликацию, события вне порядка и декларативную эволюцию схемы. Вы описываете форму потока изменений и целевую таблицу — конвейер обрабатывает остальное. APPLY CHANGES INTO поддерживает как SCD Type 1 (перезапись), так и SCD Type 2 (сохранение истории).

Для получения дополнительной информации см. Фиксация данных изменений и моментальные снимки и API-интерфейсы AUTO CDC: упрощение процесса фиксации изменений данных с помощью конвейеров.

Обеспечение качества данных с ожиданиями

Ожидания являются истинными или ложными выражениями SQL, применяемыми к каждой строке, передаваемой через набор данных. Если строка не выполняет условие, конвейер реагирует в соответствии с политикой обработки нарушений, которую вы настроили. Ожидания выдают метрики журналу событий конвейера независимо от политики, поэтому с течением времени можно отслеживать тенденции качества данных.

Выбор политики нарушения

Доступны три политики нарушений. Выберите тот вариант, который соответствует вашему уровню терпимости к некорректным данным.

  • предупреждение (по умолчанию): недопустимые записи записываются в целевую таблицу и помечены в метриках. Используйте эту политику, если требуется записать все данные, но требуется видимость проблем с качеством.
  • drop: недопустимые записи удаляются перед записью. Используйте это, если предполагается наличие ошибочных строк и они не должны передаваться далее.
  • Сбой: обновление конвейера останавливается на первой недопустимой записи. Используйте это для критически важных данных, в которых любая неисправная запись указывает на серьезную проблему на более раннем этапе обработки данных.

В следующих примерах показана каждая политика, применяемая к таблице потоковой передачи:

SQL

-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
  CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
  CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Python

from pyspark import pipelines as dp

# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/volumes/raw/orders")

# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
    return spark.readStream.table("orders_raw")

# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
    return spark.readStream.table("orders_clean")

Карантин недопустимых записей

Если вы хотите сохранить отброшенные записи для исследования, а не автоматически игнорировать их, используйте схему карантина. Перенаправляйте строки, не прошедшие проверку, в отдельную потоковую таблицу с помощью двух потоков: один из них удаляет недопустимые строки из основной таблицы, а второй записывает только недопустимые строки в таблицу карантина. Это позволяет исследовать, исправлять и повторно обрабатывать плохие данные, не загрязняя чистый набор данных.

Подробный пример шаблона карантина см. в рекомендациях по ожиданиям и расширенных шаблонах.

Дополнительные сведения о ожиданиях см. в статье "Управление качеством данных с помощью ожиданий конвейера".

Параметризация конвейеров

Конвейеры имеют параметры каталога и схемы по умолчанию, поэтому код, считывающий и записываемый в одном каталоге и схеме, работает в разных средах без каких-либо параметров. Однако если вашему конвейеру необходимо ссылаться на второй каталог или схему — например, если требуется чтение из общего исходного каталога, который отличается между средой разработки и рабочей средой — избегайте жесткой прописки этих имен непосредственно в исходных кодах. Вместо этого определите их как параметры конфигурации конвейера (пары "ключ-значение", заданные в параметрах конвейера) и ссылайтесь на них в коде. Это позволяет правильно выполнять одну базу кода во всех средах, переключив значения параметров.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum

@dp.materialized_view
def transaction_summary():
    source_catalog = spark.conf.get("source_catalog")
    return spark.read.table(f"{source_catalog}.sales.transactions") \
        .groupBy("account_id") \
        .agg(
            count("txn_id").alias("txn_count"),
            sum("amount").alias("total_amount")
        )

Дополнительные сведения см. в разделе "Использование параметров с конвейерами".

Выбор правильного режима конвейера для каждой среды

Режимы разработки и рабочего обновления

Конвейеры выполняются в режиме разработки или производственного обновления. Выберите режим, соответствующий вашей цели.

В режиме разработки конвейер повторно использует длительно работающий кластер при обновлениях и не выполняет повторных попыток при ошибках. Это ускоряет цикл итерации при написании и тестировании кода пайплайна, поскольку сведения об ошибках становятся доступными сразу, без ожидания перезапуска кластера.

В рабочем режиме кластер немедленно завершает работу после завершения каждого обновления, что снижает затраты на вычислительные ресурсы. Конвейер также включает в себя постепенное увеличение частоты повторных попыток, в том числе с перезапуском кластера, для автоматической обработки временных сбоев инфраструктуры. Используйте рабочий режим для всех запланированных запусков конвейера.

Запускаемый режим vs. режим непрерывного конвейера

Активированный режим обрабатывает все доступные данные, а затем останавливается. Это правильный выбор для подавляющего большинства пайплайнов: тех, которые выполняются по расписанию (почасово, ежедневно или по запросу) и не требуют свежести данных меньше минуты.

Непрерывный режим сохраняет работу кластера и обрабатывает новые данные по мере поступления. Это подходит только в том случае, если для вашего варианта использования требуется задержка в диапазоне секунд до минут. Так как для непрерывного режима требуется кластер, работающий постоянно, это значительно дороже, чем режим активации.

Дополнительные сведения см. в разделе "Триггер и непрерывный режим конвейера " и "Настройка конвейеров".

Использование жидкой кластеризации для макета данных

Кластеризация Liquid заменяет статическое секционирование и ZORDER для оптимизации макета данных в таблицах Delta. В отличие от секционирования, которое требует предварительного выбора столбца секционирования и может привести к перекосу данных при неравномерном распределении значений, жидкое кластерирование является самонастраивающимся, устойчивым к перекосу и инкрементальным — перезаписываются только те данные, которые нуждаются в реорганизации при каждом запуске.

Изменение столбцов кластеризации в любое время без перезаписи полной таблицы по мере развития шаблонов запросов.

Определите столбцы кластеризации в определении таблицы потоковой передачи:

SQL

CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

Python

from pyspark import pipelines as dp

@dp.table(cluster_by=["event_date", "region"])
def events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "parquet") \
        .load("/volumes/raw/events")

Если вы не уверены, по каким столбцам следует кластеризовать, используйте CLUSTER BY AUTO, чтобы Databricks автоматически выбрал оптимальные столбцы кластеризации, основываясь на нагрузке вашего запроса.

Дополнительные сведения см. в разделе Потоковые таблицы и Жидкая кластеризация для таблиц.

Управление конвейерами с помощью ci/CD и декларативных пакетов автоматизации

Управление версиями исходного кода конвейера и использование декларативных пакетов автоматизации для управления развертываниями в разных средах.

Дополнительные сведения см. в статье "Создание управляемого источником конвейера", преобразование конвейера в проект пакета и использование параметров с конвейерами.

Хранение кода пайплайна в системе управления версиями

Храните все исходные файлы конвейера (Python и SQL) вместе с конфигурацией пакета в репозитории Git. Управление версиями полного проекта дает полный журнал изменений, упрощает совместную работу и позволяет проверять изменения в среде разработки, прежде чем продвигать их в рабочую среду.

Databricks рекомендует декларативные пакеты автоматизации для управления этим рабочим процессом. Пакет определяет конфигурацию конвейера в YAML вместе с исходным кодом, а databricks bundle интерфейс командной строки позволяет проверять, развертывать и запускать конвейеры из терминала или системы CI/CD.

Использование целевых объектов пакета для изоляции среды

Пакеты позволяют использовать несколько целевых объектов (например, dev, staging, prod), каждый из которых имеет собственный набор параметров для переопределения имен каталогов, политик кластера, адресов уведомлений и других настроек. Объедините цели пакета с параметрами конвейера, чтобы вводить нужные значения среды во время развёртывания, чтобы поддерживать исходный код свободным от констант среды.

Типичный рабочий процесс выглядит следующим образом:

  1. Разработчик работает с функциональной веткой, развертывая в личный конвейер разработки в каталоге разработки.
  2. При слиянии с главной ветвью система CI запускает databricks bundle validate и databricks bundle deploy --target staging для проверки и развертывания конвейера в промежуточной среде.
  3. После успешного прохождения тестирования система CI автоматически развертывается в рабочей среде с databricks bundle deploy --target prod.

Рекомендации по потоковой передаче

Используйте эти шаблоны для управления состоянием, управления поздними данными и надежности конвейеров потоковой передачи.

Дополнительные сведения см. в статье "Оптимизация обработки с отслеживанием состояния с помощью водяных знаков", восстановление конвейера после сбоя контрольной точки потокового вещания и восстановление исторических данных с помощью конвейеров.

Используйте водяные знаки для операций с состоянием

Водяные знаки ограничивают состояние, которое сохраняется конвейером в памяти во время операций потоковой передачи с отслеживанием состояния, таких как агрегирование с использованием окон и дедупликация. Без водяного знака состояние растет неограниченно, так как обработчик накапливает данные для каждого возможного ключа, в конечном итоге вызывая ошибки нехватки памяти на длительных потоках обработки данных.

Подложка задает столбец метки времени и порог допуска для поздних данных. Записи, поступающие после прохождения порогового значения, удаляются. Выберите пороговое значение, которое уравновешивает вашу терпимость к поздним данным с затратами памяти на поддержание этого состояния открытым.

В следующем примере вычисляется одноминутная агрегация окна с трехминутным водяным знаком.

SQL

CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
  WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import window

@dp.table
def event_counts():
    return (
        spark.readStream.table("events_raw")
            .withWatermark("event_time", "3 minutes")
            .groupBy(window("event_time", "1 minute"), "region")
            .count()
    )

Замечание

Чтобы гарантировать, что агрегаты обрабатываются постепенно, а не полностью перекомпилируются при каждом обновлении, необходимо определить подложку.

Общие сведения о состоянии потоковой передачи и полном обновлении

Состояние потоковой передачи является добавочным: конвейер создает и сохраняет состояние между обновлениями, а не повторно компьютирует с нуля каждый раз. Это делает потоковую передачу с отслеживанием состояния эффективной, но это также означает, что если изменить логику запроса с отслеживанием состояния (например, изменение порогового значения подложки или изменение столбцов агрегирования), существующее состояние больше не совместимо с новой логикой. В этом случае необходимо выполнить полное обновление, чтобы повторно обработать все исторические данные с помощью новой логики и перестроить состояние с нуля.

Полное обновление также может привести к потере данных, если источник не сохраняет исторические данные. Например, если у источника Kafka краткий период хранения, то во время обновления могут быть доступны только последние несколько минут данных, в результате чего таблица будет содержать намного меньше данных, чем раньше. Тщательно продумайте изменения в логике запросов с сохранением состояния, особенно для потоков с большим объемом данных, где полностью обновить данные дорого или где источник имеет ограниченный срок хранения данных. Использование архитектуры медальона помогает создавать бронзовые таблицы с минимальным преобразованием и позволяет серебряным или золотым таблицам перекомпьютировать из бронзовых таблиц с полной историей.

Соединения потоков данных

Соединения потоков требуют водяного знака на обоих сторонах соединения и условия соединения с ограниченным временем. Интервал времени в условии соединения сообщает стриминговому движку, когда дальнейшие сопоставления невозможны, что позволяет удалить состояние, которое больше не может быть сопоставлено. Если опустить либо водяные знаки, либо условие с ограничением времени, состояние становится неограниченным.

В следующем примере события показа рекламы присоединяются к событиям щелчка, требуя, чтобы нажатие произошло в течение трех минут от впечатления:

SQL

CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
    WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
    WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
  AND clk.click_time BETWEEN imp.impression_time
    AND imp.impression_time + INTERVAL 3 MINUTES;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

dp.create_streaming_table("impression_clicks")

@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
    impressions = spark.readStream.table("ad_impressions") \
        .withWatermark("impression_time", "3 minutes")
    clicks = spark.readStream.table("user_clicks") \
        .withWatermark("click_time", "3 minutes")
    return impressions.alias("imp").join(
        clicks.alias("clk"),
        expr("""
            imp.ad_id = clk.ad_id AND
            clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
        """),
        "leftOuter"
    )

При присоединении потока к статической таблице (присоединению к моментальному снимку) моментальный снимок статической таблицы обновляется в начале каждого микробатча. Это означает, что пришедшие позднее записи измерений не применяются задним числом к данным, которые уже обработаны. Если требуется применять изменения ретроактивно, используйте материализованное представление или реструктурируйте конвейер.

Оптимизация производительности конвейера

Примените эти методы для снижения затрат на вычисления и ускорения обновлений конвейера.

Дополнительные сведения см. в разделе "Материализованные представления" и "Оптимизация обработки с сохранением состояния" с помощью водяных знаков.

Избегайте небольших файлов

Слишком частый запуск конвейера на источнике данных с низким объемом приводит к записи большого количества небольших файлов в облачное хранилище данных. Небольшие файлы ухудшают производительность чтения, так как для каждого файла требуется отдельный поиск метаданных и операции ввода-вывода с возвратом. Кроме того, API облачного хранилища ограничивают операции перечисления на больших масштабах. Чтобы избежать этого, выберите интервал триггера, соответствующий объему ваших данных: запускайте активированные конвейеры по расписанию, которое позволяет накапливать значимый объем данных между обновлениями, а не непрерывно.

Обработка отклонений данных

Перекос данных возникает, когда значения в ключе соединения или группировки по разным разделам распределяются неравномерно, это приводит к тому, что небольшое количество задач обрабатывает большую часть данных. Это создает горячие точки, которые увеличивают конечное время обновления. Используйте кластеризацию жидкости для устранения отклонений в хранимых таблицах. Для устранения смещения, которое происходит во время выполнения вычислений, насыщайте сильно смещенные ключи, добавляя случайный суффикс сегмента перед группировкой и агрегацией в два этапа.

Дополнительные сведения см. в разделе "Использование кластеризации жидкости" для макета данных.

Используйте инкрементное обновление для материализованных представлений

При использовании материализованного представления для крупной агрегации декларативные конвейеры Lakeflow Spark пытаются инкрементно обновить его — обрабатывая только исходящие изменения после последнего обновления, а не пересчитывая полный набор результатов. Добавочное обновление значительно дешевле, чем повторное выполнение запроса с нуля на каждом триггере конвейера. Чтобы максимально повысить вероятность того, что материализованное представление можно обновлять постепенно, записывать простые, детерминированные запросы агрегирования и избегать конструкций, которые предотвращают добавочную обработку, например недетерминированные функции.

См. инкрементальное обновление материализованных представлений.

Оптимизация соединений

Для соединений, где одна из таблиц является небольшой таблицей измерений, добавьте подсказку broadcast, чтобы указать Spark транслировать меньшую таблицу всем экзекьюторам, а не выполнять соединение с перемешиванием.

SQL

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast

@dp.materialized_view
def enriched_orders():
    orders = spark.read.table("orders")
    products = spark.read.table("products")
    return orders.join(broadcast(products), "product_id")

Для соединений между временными рядами (например, поиск ближайшего события в диапазоне времени) используйте условие соединения диапазона и убедитесь, что обе стороны имеют метку времени при присоединении потоков или рассмотрите возможность предварительного объединения событий во временные интервалы перед присоединением.

Мониторинг конвейеров

Журнал событий конвейера является основным примитивом наблюдаемости в Декларативных конвейерах Spark Lakeflow. Каждый запуск конвейера записывает структурированные записи в журнал событий, охватывающий ход выполнения, результаты ожидания качества данных, происхождение данных и сведения об ошибке. Журнал событий — это таблица Delta, которую можно запрашивать напрямую.

Чтобы запросить журнал событий, не зная базовый путь к хранилищу, используйте event_log() табличную функцию в общем кластере или хранилище SQL:

SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Создавайте панели мониторинга качества данных, запрашивая журнал событий для метрик ожидания. Столбец details содержит вложенную структуру JSON с счетчиками успехов и неудач для каждого ограничения, которые можно использовать для отслеживания тенденций качества с течением времени и оповещения о регрессиях.

Для оповещения на основе событий используйте крючки событий, чтобы активировать пользовательские веб-перехватчики или службы уведомлений (например, Slack или PagerDuty), если конвейер терпит неудачу или пороговое значение качества данных нарушается. Перехватчики событий — это функции Python, которые выполняются в ответ на события конвейера.

Дополнительные сведения см. в разделе "Мониторинг конвейеров", "Журнал событий конвейера" и "Определение настраиваемого мониторинга конвейеров с помощью перехватчиков событий".

Использование бессерверных вычислений

Databricks рекомендует бессерверные вычисления для новых конвейеров. При использовании бессерверных конфигураций кластера нет ручной конфигурации. Databricks автоматически управляет инфраструктурой. Бессерверные конвейеры используют расширенное автомасштабирование, которое может масштабироваться как горизонтально (больше исполнителей), так и вертикально (размер исполнителя) в ответ на требования рабочей нагрузки. Бессерверные конвейеры всегда используют Каталог Unity, поэтому управление и отслеживание происхождения встроены по умолчанию.

Дополнительные сведения см. в разделе "Настройка бессерверного конвейера".

Упорядочение конвейеров с помощью архитектуры медальона

Архитектура медальона упорядочивает данные в три логических слоя — бронзовую, серебряную и золотую — каждый из которых имеет определенную цель. Сопоставление типов наборов данных конвейеров декларативного типа Lakeflow Spark с правильным уровнем обеспечивает ясное разграничение обязанностей каждого уровня и упрощает поддержку конвейеров.

  • Бронза: Используйте потоковые таблицы для приема необработанных данных из облачного хранилища, автобусов сообщений или источников CDC. Бронзовые таблицы сохраняют необработанные исходные данные с минимальным преобразованием, что обеспечивает возможность повторной обработки серебряных или золотых слоев из источника в бронзовом слое при изменении требований.
  • Silver: используйте таблицы потоковой передачи для добавочных преобразований на уровне строк (фильтрация, очистка и синтаксический анализ). Используйте материализованные представления, когда логика серебряного слоя включает соединения с таблицами измерений или сложные агрегации, которые получают преимущество от добавочного обновления.
  • Золото: Используйте материализованные представления для предварительного вычисления агрегатов, метрик и сводок, предоставляемых панелям мониторинга, средствам отчетов и последующим потребителям.

Разделяйте процессы приема (бронзы) и преобразования (серебра и золота) в отдельные графы конвейеров, где это возможно. Разделение слоев позволяет планировать, отслеживать и устранять неполадки в каждом слое независимо, а сбой в конвейере обработки не блокирует поступление новых данных на бронзовом уровне.

Дополнительные сведения см. в таблицах потоковой обработки и материализованных представлениях.