Поделиться через


Преобразование данных с помощью конвейеров

В этой статье описывается, как использовать конвейеры для объявления преобразований в наборах данных и указания способа обработки записей с помощью логики запроса. Он также содержит примеры распространенных шаблонов преобразования для создания конвейеров.

Набор данных можно определить для любого запроса, возвращающего DataFrame. Встроенные операции Apache Spark, UDF-функции, пользовательская логика и модели MLflow можно использовать в качестве преобразований в декларативных конвейерах Lakeflow Spark. После приема данных в конвейер можно определить новые наборы данных на основе вышестоящих источников для создания новых потоковых таблиц, материализованных представлений и представлений.

Сведения о том, как эффективно выполнять состояние-ориентированную обработку в конвейере, см. в разделе "Оптимизация состояния-ориентированной обработки с помощью водяных знаков".

Когда следует использовать представления, материализованные представления и потоковые таблицы

При реализации запросов конвейера выберите лучший тип набора данных, чтобы убедиться, что они эффективны и поддерживаются.

Рассмотрите возможность использования представления для выполнения следующих действий:

  • Разделите большой или сложный запрос, чтобы он стал более управляемым и легче обрабатывать.
  • Проверьте промежуточные результаты с помощью ожиданий.
  • Уменьшите затраты на хранение и вычислительные ресурсы для результатов, которые не нужно сохранять. Так как таблицы материализованы, они требуют дополнительных вычислительных ресурсов и ресурсов хранилища.

Рекомендуется использовать материализованное представление, когда:

  • Несколько последующих запросов обрабатывают таблицу. Так как представления вычисляются по запросу, представление вычисляется повторно при каждом запросе представления.
  • Другие каналы, задания или запросы потребляют таблицу. Поскольку представления не материализованы, их можно использовать только в одном конвейере.
  • Вы хотите просмотреть результаты запроса во время разработки. Так как таблицы материализованы и могут просматриваться и запрашиваться за пределами конвейера, использование таблиц во время разработки может помочь проверить правильность вычислений. После проверки преобразуйте запросы, которые не требуют материализации в представления.

Подумайте об использовании потоковой таблицы, когда:

  • Запрос определяется для источника данных, который постоянно или постепенно растет.
  • Результаты запроса должны вычисляться постепенно.
  • Конвейеру требуется высокая пропускная способность и низкая задержка.

Замечание

Таблицы потоковых данных всегда определяются по отношению к потоковым источникам. Вы также можете использовать источники потоковой передачи с AUTO CDC ... INTO для применения обновлений из потоков данных CDC. См . API AUTO CDC: упрощение отслеживания изменений с помощью конвейеров.

Исключение таблиц из целевой схемы

Если необходимо вычислить промежуточные таблицы, не предназначенные для внешнего потребления, можно предотвратить их публикацию в схеме, используя ключевое слово TEMPORARY. Временные таблицы по-прежнему хранят и обрабатывают данные в соответствии с семантикой декларативных конвейеров Lakeflow Spark, но не должны быть доступны за пределами текущего конвейера. Временная таблица сохраняется в течение всего срока существования конвейера, создающего её. Используйте следующий синтаксис для объявления временных таблиц:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Питон

@dp.table(
  temporary=True)
def temp_table():
  return ("...")

Объединение потоковых таблиц и материализованных представлений в одном конвейере

Таблицы потоковой передачи наследуют гарантии обработки структурированной потоковой передачи Apache Spark и настраиваются для обработки запросов из добавляемых источников данных, где новые строки всегда вставляются в исходную таблицу без изменений.

Замечание

Хотя по умолчанию для потоковых таблиц требуются источники данных, поддерживающие только добавление, если источник потоковой передачи является другой потоковой таблицей, требующей обновлений или удаления, можно переопределить это поведение с помощью флага skipChangeCommits.

Распространенный шаблон потоковой обработки данных включает прием исходных данных для создания начальных наборов данных в потоке обработки. Эти начальные наборы данных обычно называются бронзовыми таблицами и часто выполняют простые преобразования.

Напротив, конечные таблицы в конвейере, обычно называемые золотыми таблицами, требуют сложных агрегаций или чтения из целевых объектов операции AUTO CDC ... INTO. Поскольку эти операции по сути создают обновления, а не добавляются, они не поддерживаются как входные данные для потоковых таблиц. Эти преобразования лучше подходят для материализованных представлений.

Смешав потоковые таблицы и материализованные представления в один конвейер, вы можете упростить конвейер, избежать дорогостоящей повторной приема или повторной обработки необработанных данных и получить полную мощность SQL для вычисления сложных агрегатов по эффективно закодированному и фильтруемому набору данных. В следующем примере показан этот тип смешанной обработки:

Замечание

В этих примерах используется автозагрузчик для загрузки файлов из облачного хранилища. Чтобы загрузить файлы с помощью Auto Loader в конвейере с поддержкой каталога Unity, нужно применять внешние расположения . Дополнительные сведения об использовании каталога Unity с конвейерами см. в статье "Использование каталога Unity с конвейерами".

Питон

@dp.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dp.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dp.materialized_view
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.read.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

Дополнительные сведения о постепенной загрузке JSON-файлов из хранилища Azure с использованием автозагрузчика .

статические потоки соединений

Потоково-статические соединения являются хорошим выбором при денормализации непрерывного потока данных, доступных только для добавления, с преимущественно статической таблицей измерений.

При каждом обновлении района распределения новые записи из потока присоединяются к последнему моментальному снимку статической таблицы. Если записи добавляются или обновляются в статической таблице после обработки соответствующих данных из потоковой таблицы, результирующие записи не пересчитываются, если не выполняется полное обновление.

В конвейерах, настроенных для выполнения по триггеру, статическая таблица возвращает результаты на момент начала обновления. В конвейерах, настроенных для непрерывного выполнения, последняя версия статической таблицы запрашивается каждый раз, когда таблица обрабатывает обновление.

Ниже приведен пример потоково-статического соединения:

Питон

@dp.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

эффективно вычислять сводные данные

Таблицы потоковой передачи можно использовать для постепенного вычисления простых агрегатов распределения, таких как число, мин, макс, сумма и алгебраические агрегаты, такие как среднее или стандартное отклонение. Databricks рекомендует инкрементальное агрегирование запросов с ограниченным числом групп, например, запрос с предложением GROUP BY country. С каждым обновлением считываются только новые входные данные.

Дополнительные сведения о написании запросов декларативных конвейеров Lakeflow Spark, выполняющих инкрементные агрегации, см. в разделе «Выполнение оконных агрегатов с метками времени».

Использование моделей MLflow в декларативных конвейерах Spark Lakeflow

Замечание

Чтобы использовать модели MLflow в конвейере с поддержкой каталога Unity, необходимо настроить конвейер для использования канала preview. Чтобы использовать канал current, необходимо настроить конвейер для публикации в хранилище метаданных Hive.

В конвейерах можно использовать обученные MLflow модели. Модели MLflow рассматриваются как преобразования в Azure Databricks, что означает, что они действуют на входной Spark DataFrame и возвращают результаты в виде Spark DataFrame. Поскольку декларативные конвейеры Lakeflow Spark определяют наборы данных для DataFrames, вы можете преобразовать рабочие нагрузки Apache Spark, использующие MLflow, в конвейеры всего несколькими строками кода. Дополнительные сведения о MLflow см. в разделе MLflow для жизненного цикла модели машинного обучения.

Если у вас уже есть скрипт Python, вызывающий модель MLflow, этот код можно адаптировать к конвейеру с помощью @dp.table или @dp.materialized_view декоратора и обеспечить определение функций для возврата результатов преобразования. Декларативные конвейеры Spark Lakeflow по умолчанию не устанавливают MLflow, поэтому убедитесь, что вы установили библиотеки MLflow с помощью %pip install mlflow, а также импортировали mlflow и dp в верхней части вашего исходного кода. Общие сведения о синтаксисе конвейера см. в статье "Разработка кода конвейера с помощью Python".

Чтобы использовать модели MLflow в конвейерах, выполните следующие действия.

  1. Получите идентификатор запуска и имя модели MLflow. Идентификатор выполнения и имя модели используются для создания URI модели MLflow.
  2. Используйте URI для определения UDF для Spark для загрузки модели MLflow.
  3. Вызовите UDF в определениях таблиц, чтобы использовать модель MLflow.

В следующем примере показан базовый синтаксис для этого шаблона:

%pip install mlflow

from pyspark import pipelines as dp
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dp.materialized_view
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

В качестве полного примера следующий код определяет UDF Spark с именем loaded_model_udf, который загружает модель MLflow, обученную по данным риска кредита. Столбцы данных, используемые для прогнозирования, передаются в качестве аргумента функции UDF. Таблица loan_risk_predictions вычисляет прогнозы для каждой строки в loan_risk_input_data.

%pip install mlflow

from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dp.materialized_view(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Сохранить выполненные вручную удаления или обновления

Декларативные конвейеры Spark Lakeflow позволяют вручную удалять или обновлять записи из таблицы и выполнять операцию обновления для повторной компиляции подчиненных таблиц.

По умолчанию конвейеры перекомпилируют результаты таблицы на основе входных данных при каждом обновлении, поэтому необходимо убедиться, что удаленная запись не перезагружается из исходных данных. Установка свойства таблицы pipelines.reset.allowed на false предотвращает обновление таблицы, но не мешает добавочным записям в таблицы и поступлению новых данных в таблицу.

На следующей схеме показан пример использования двух потоковых таблиц.

  • raw_user_table обрабатывает необработанные пользовательские данные из источника.
  • bmi_table постепенно вычисляет показатели ИМТ, используя вес и рост от raw_user_table.

Вы хотите вручную удалить или обновить записи пользователей из raw_user_table и перекомпьютировать bmi_table.

Сохранить схему данных

В следующем коде показано, как установить свойство таблицы pipelines.reset.allowed в false, чтобы отключить полное обновление для raw_user_table, таким образом обеспечивая сохранение запланированных изменений со временем, при этом подчиненные таблицы пересчитываются при запуске обновления конвейера.

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);