Поделиться через


Преобразование данных с помощью разностных динамических таблиц

В этой статье описывается, как использовать разностные динамические таблицы для объявления преобразований в наборах данных и указания способа обработки записей с помощью логики запроса. Он также содержит некоторые примеры распространенных шаблонов преобразования, которые могут быть полезны при создании конвейеров Delta Live Tables.

Набор данных можно определить для любого запроса, возвращающего кадр данных. Встроенные операции Apache Spark, определяемые пользователем функции, пользовательские логики и модели MLflow можно использовать в качестве преобразований в конвейере Delta Live Tables. После приема данных в конвейер Delta Live Tables можно определить новые наборы данных для вышестоящий источников для создания новых потоковых таблиц, материализованных представлений и представлений.

Сведения о том, как эффективно выполнять обработку с отслеживанием состояния с помощью разностных динамических таблиц, см. в статье "Оптимизация обработки с отслеживанием состояния" в разностных динамических таблицах с подложками.

Когда следует использовать представления, материализованные представления и таблицы потоковой передачи

Чтобы убедиться, что конвейеры эффективны и поддерживаются, выберите лучший тип набора данных при реализации запросов конвейера.

Рассмотрите возможность использования представления, когда:

  • У вас есть большой или сложный запрос, который вы хотите разбить на более удобные запросы.
  • Вы хотите проверить промежуточные результаты с помощью ожиданий.
  • Вы хотите сократить затраты на хранение и вычислительные ресурсы и не требовать материализации результатов запроса. Так как таблицы материализованы, они требуют дополнительных вычислительных ресурсов и ресурсов хранилища.

Рекомендуется использовать материализованное представление, когда:

  • Несколько подчиненных запросов используют таблицу. Так как представления вычисляются по запросу, представление вычисляется повторно при каждом запросе представления.
  • Другие конвейеры, задания или запросы используют таблицу. Поскольку представления не материализованы, их можно использовать только в одном конвейере.
  • Вы хотите просмотреть результаты запроса во время разработки. Так как таблицы материализованы и могут просматриваться и запрашиваться за пределами конвейера, использование таблиц во время разработки может помочь проверить правильность вычислений. После проверки преобразуйте запросы, которые не требуют материализации в представления.

Рекомендуется использовать таблицу потоковой передачи, когда:

  • Запрос определяется для источника данных, который постоянно или постепенно растет.
  • Результаты запроса должны вычисляться постепенно.
  • Для конвейера необходим высокий уровень пропускной способности и низкая задержка.

Примечание.

Таблицы потоковой передачи всегда определяются для источников потоковой передачи. Вы также можете использовать источники потоковой передачи для APPLY CHANGES INTO применения обновлений из веб-каналов CDC. См . РАЗДЕЛ API APPLY CHANGES: Упрощение записи измененных данных в разностных динамических таблицах.

Объединение потоковых таблиц и материализованных представлений в одном конвейере

Таблицы потоковой передачи наследуют гарантии обработки структурированной потоковой передачи Apache Spark и настраиваются для обработки запросов из источников данных только для добавления, где новые строки всегда вставляются в исходную таблицу, а не изменяются.

Примечание.

Хотя по умолчанию для потоковых таблиц требуются источники данных только для добавления, если источник потоковой передачи является другой потоковой таблицей, требующей обновления или удаления, можно переопределить это поведение с помощью флага skipChangeCommits.

Распространенный шаблон потоковой передачи включает прием исходных данных для создания начальных наборов данных в конвейере. Эти начальные наборы данных обычно называются бронзовыми таблицами и часто выполняют простые преобразования.

Напротив, конечные таблицы в конвейере, обычно называемые золотыми таблицами, часто требуют сложных агрегатов или чтения из источников, которые являются целевыми объектами APPLY CHANGES INTO операции. Поскольку эти операции по сути создают обновления, а не добавляются, они не поддерживаются как входные данные для потоковых таблиц. Эти преобразования лучше подходят для материализованных представлений.

Смешав потоковые таблицы и материализованные представления в один конвейер, вы можете упростить конвейер, избежать дорогостоящей повторной приема или повторной обработки необработанных данных и получить полную мощность SQL для вычисления сложных агрегатов по эффективно закодированному и фильтруемому набору данных. Такой тип смешанной обработки показан в следующем примере:

Примечание.

В этих примерах используется автозагрузчик для загрузки файлов из облачного хранилища. Для загрузки файлов с помощью автозагрузчика в конвейере с поддержкой каталога Unity необходимо использовать внешние расположения. Дополнительные сведения об использовании каталога Unity с разностными динамическими таблицами см. в статье Использование каталога Unity с конвейерами таблиц Delta Live Tables.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Дополнительные сведения об использовании автоматического загрузчика для эффективного чтения файлов JSON из службы хранилища Azure для добавочной обработки.

Статические соединения потоковой передачи

Потоковые статические соединения являются хорошим выбором при денормализации непрерывного потока данных только для добавления с помощью таблицы статических измерений.

При каждом обновлении конвейера новые записи из потока присоединяются к самому текущему моментальному снимку статической таблицы. Если записи добавляются или обновляются в статической таблице после обработки соответствующих данных из потоковой таблицы, результирующие записи не пересчитываются, если не выполняется полное обновление.

В конвейерах, настроенных для запуска, статическая таблица возвращает результаты по истечении времени запуска обновления. В конвейерах, настроенных для непрерывного выполнения, каждый раз, когда таблица обрабатывает обновление, запрашивается последняя версия статической таблицы.

Ниже приведен пример статитического соединения потока:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Эффективное вычисление статистических выражений

Таблицы потоковой передачи можно использовать для постепенного вычисления простых агрегатов распределения, таких как число, мин, макс, сумма и алгебраические агрегаты, такие как среднее или стандартное отклонение. Databricks рекомендует выполнять добавочное агрегирование запросов с ограниченным количеством групп, например запрос с предложением GROUP BY country. После каждого обновления считываются только новые входные данные.

Дополнительные сведения о написании запросов Delta Live Tables, выполняющих добавочные агрегаты, см. в статье "Выполнение агрегирования с помощью подложек".

Использование моделей MLflow в конвейере разностных динамических таблиц

Примечание.

Чтобы использовать модели MLflow в конвейере с поддержкой каталога Unity, необходимо настроить конвейер для использования preview канала. Чтобы использовать current канал, необходимо настроить конвейер для публикации в хранилище метаданных Hive.

Вы можете использовать обученные MLflow модели в конвейерах Delta Live Tables. Модели MLflow рассматриваются как преобразования в Azure Databricks, что означает, что они действуют на входных данных Spark DataFrame и возвращают результаты в качестве кадра данных Spark. Так как разностные динамические таблицы определяют наборы данных для кадров данных, вы можете преобразовать рабочие нагрузки Apache Spark, использующие MLflow в разностные динамические таблицы с несколькими строками кода. Дополнительные сведения об управлении жизненным циклом машинного обучения с помощью MLflow.

Если у вас уже есть записная книжка Python, вызывающая модель MLflow, можно адаптировать этот код к разностным динамическим таблицам с помощью @dlt.table декоратора и обеспечения определения функций для возврата результатов преобразования. Разностные динамические таблицы по умолчанию не устанавливают MLflow, поэтому убедитесь, что вы %pip install mlflow и импортируете mlflow и dlt в верхней части записной книжки. Общие сведения о синтаксисе Delta Live Table см. в примере : прием и обработка данных о именах детей в Нью-йорке.

Чтобы использовать модели MLflow в разностных динамических таблицах, выполните следующие действия.

  1. Получите идентификатор выполнения и имя модели MLflow. Идентификатор выполнения и имя модели используются для создания URI модели MLflow.
  2. Используйте URI для определения UDF Spark для загрузки модели MLflow.
  3. Вызовите UDF в определениях таблиц, чтобы использовать модель MLflow.

В следующем примере показан базовый синтаксис для этого шаблона:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

В качестве полного примера следующий код определяет UDF Spark с именем loaded_model_udf , который загружает модель MLflow, обученную по данным риска кредита. Столбцы данных, используемые для прогнозирования, передаются в качестве аргумента в UDF. Таблица loan_risk_predictions вычисляет прогнозы для каждой строки в loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Сохранение удалений или обновлений вручную

Разностные динамические таблицы позволяют вручную удалять или обновлять записи из таблицы и выполнять операцию обновления для повторной компиляции подчиненных таблиц.

По умолчанию Разностные динамические таблицы перекомпилируют результаты на основе входных данных при каждом обновлении конвейера, поэтому необходимо убедиться, что удаленная запись не перезагрузится из исходных данных. pipelines.reset.allowed Задание свойства таблицы, чтобы false предотвратить обновление таблицы, но не предотвращает добавочную запись в таблицы или предотвращает поток новых данных в таблицу.

На следующей схеме показан пример использования двух таблиц потоковой передачи:

  • raw_user_table прием необработанных данных пользователя из источника.
  • bmi_table последовательно вычисляет оценки индекса массы тела (ИМТ), используя значения веса и роста из raw_user_table.

Вы хотите вручную удалить или обновить записи пользователей из и raw_user_table перекомпьютировать его bmi_table.

Схема сохранения данных

В следующем коде показано, как задать pipelines.reset.allowed свойство таблицы, чтобы false отключить полное обновление raw_user_table , чтобы с течением времени сохранялись предполагаемые изменения, но при запуске обновления конвейера перекомпьютеризованы следующие таблицы:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);