Руководство. Запуск первого конвейера разностных динамических таблиц

Внимание

Бессерверные конвейеры DLT доступны в общедоступной предварительной версии. Чтобы узнать о включении конвейеров DLT без сервера, обратитесь к группе учетных записей Azure Databricks.

В этом руководстве показано, как настроить конвейер Delta Live Tables из кода в записной книжке Databricks и запустить конвейер, активировав обновление конвейера. В этом руководстве приведен пример конвейера для приема и обработки примера набора данных с примером кода с помощью интерфейсов Python и SQL . Инструкции, описанные в этом руководстве, также можно использовать для создания конвейера с любыми записными книжками с правильно определенным синтаксисом Delta Live Table.

Можно настроить конвейеры и триггеры конвейеров Delta Live Tables с помощью пользовательского интерфейса рабочей области Azure Databricks или автоматических средств, таких как API, CLI, Наборы ресурсов Databricks или как задача в рабочем процессе Databricks. Чтобы ознакомиться с функциями и функциями Delta Live Tables, Databricks рекомендует сначала использовать пользовательский интерфейс для создания и запуска конвейеров. Кроме того, при настройке конвейера в пользовательском интерфейсе разностные динамические таблицы создают конфигурацию JSON для конвейера, который можно использовать для реализации программных рабочих процессов.

Чтобы продемонстрировать функциональные возможности Delta Live Tables, в примерах этого руководства скачайте общедоступный набор данных. Однако Databricks имеет несколько способов подключения к источникам данных и приема данных, которые конвейеры, реализующие реальные варианты использования, будут использоваться. См . сведения о приеме данных с помощью разностных динамических таблиц.

Требования

  • Чтобы запустить бессерверный конвейер, необходимо иметь разрешение на создание кластера или доступ к политике кластера, определяющей кластер Delta Live Tables. Перед запуском конвейера среда выполнения Delta Live Tables создает кластер и завершается ошибкой, если необходимые разрешения отсутствуют.

  • Чтобы использовать примеры, приведенные в этом руководстве, рабочая область должна включать каталог Unity.

  • У вас должны быть следующие разрешения в каталоге Unity:

    • READ VOLUME и WRITE VOLUME, или ALL PRIVILEGES, для тома my-volume .
    • USE SCHEMA или ALL PRIVILEGES для default схемы.
    • USE CATALOG или ALL PRIVILEGES для main каталога.

    Чтобы задать эти разрешения, ознакомьтесь с правами администратора Databricks или каталога Unity и защищаемыми объектами.

  • Примеры, приведенные в этом руководстве, используют том каталога Unity для хранения примеров данных. Чтобы использовать эти примеры, создайте том и используйте каталог, схему и имена томов, чтобы задать путь тома, используемый примерами.

Примечание.

Если в рабочей области нет включенного каталога Unity, записные книжки с примерами, для которых не требуется каталог Unity, присоединены к этой статье. Чтобы использовать эти примеры, выберите Hive metastore в качестве параметра хранилища при создании конвейера.

Где выполняются запросы Delta Live Tables?

Запросы Delta Live Tables в основном реализуются в записных книжках Databricks, но разностные динамические таблицы не предназначены для интерактивного выполнения в ячейках записных книжек. Выполнение ячейки, содержащей синтаксис Delta Live Table в записной книжке Databricks, приводит к ошибке. Чтобы выполнить запросы, необходимо настроить записные книжки в рамках конвейера.

Внимание

  • Вы не можете полагаться на порядок выполнения записных книжек по ячейкам при написании запросов для разностных динамических таблиц. Delta Live Tables вычисляет и запускает весь код, определенный в записных книжках, но имеет другую модель выполнения, чем записная книжка Run all command.
  • Нельзя смешивать языки в одном файле исходного кода разностных динамических таблиц. Например, записная книжка может содержать только запросы Python или SQL-запросы. Если в конвейере необходимо использовать несколько языков, используйте несколько записных книжек или файлов в конвейере.

Вы также можете использовать код Python, хранящийся в файлах. Например, можно создать модуль Python, который можно импортировать в конвейеры Python или определить определяемые пользователем функции Python для использования в запросах SQL. Дополнительные сведения об импорте модулей Python см. в статье "Импорт модулей Python" из папок Git или файлов рабочей области. Дополнительные сведения об использовании определяемых пользователем функций Python см. в статье о определяемых пользователем скалярных функциях — Python.

Пример: прием и обработка данных о именах детей в Нью-Йорке

В примере в этой статье используется общедоступный набор данных, содержащий записи имен ребенка штата Нью-Йорк. В этих примерах демонстрируется использование конвейера "Динамические таблицы Delta Live Tables" для:

  • Чтение необработанных CSV-данных из общедоступного набора данных в таблицу.
  • Чтение записей из таблицы необработанных данных и использование ожидаемых данных Delta Live Tables для создания новой таблицы, содержащей очищенные данные.
  • Используйте очищенные записи в качестве входных данных в запросы Delta Live Tables, которые создают производные наборы данных.

Этот код демонстрирует упрощенный пример архитектуры медальона. См. статью "Что такое архитектура медальона lakehouse?".

Реализации этого примера предоставляются для интерфейсов Python и SQL . Вы можете выполнить действия по созданию записных книжек, содержащих пример кода, или перейти к созданию конвейера и использовать одну из записных книжек, указанных на этой странице.

Реализация конвейера разностных динамических таблиц с помощью Python

Код Python, создающий наборы данных Delta Live Tables, должен возвращать кадры данных, знакомые пользователям с PySpark или Pandas для spark. Для пользователей, незнакомых с DataFrames, Databricks рекомендует использовать интерфейс SQL. См. статью "Реализация конвейера разностных динамических таблиц" с помощью SQL.

В модуле dlt реализованы все API Python для разностных динамических таблиц. Код конвейера Delta Live Tables, реализованный с помощью Python, должен явно импортировать dlt модуль в верхней части записных книжек и файлов Python. Разностные динамические таблицы отличаются от многих скриптов Python ключевым способом: вы не вызываете функции, которые выполняют прием и преобразование данных для создания наборов данных Delta Live Tables. Вместо этого Delta Live Tables интерпретирует функции декоратора из dlt модуля во всех файлах, загруженных в конвейер, и создает граф потока данных.

Чтобы реализовать пример в этом руководстве, скопируйте и вставьте следующий код Python в новую записную книжку Python. Каждый пример фрагмента кода следует добавить в свою ячейку в записной книжке в описанном порядке. Дополнительные сведения о создании записных книжек см. в статье "Создание записной книжки".

Примечание.

При создании конвейера с интерфейсом Python по умолчанию имена таблиц определяются именами функций. Например, в следующем примере Python создаются три таблицы с именем baby_names_raw, baby_names_preparedи top_baby_names_2021. Имя таблицы можно переопределить с помощью name параметра. См. статью "Создание материализованного представления разностных динамических таблиц" или потоковой таблицы.

Импорт модуля Delta Live Tables

В модуле dlt реализованы все API Python для разностных динамических таблиц. Явно импортируйте dlt модуль в верхней части записных книжек и файлов Python.

В следующем примере показан этот импорт, а также операторы импорта для pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Скачивание данных

Чтобы получить данные в этом примере, вы скачайте CSV-файл и сохраните его в томе следующим образом:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Замените <catalog-name>, <schema-name>а также <volume-name> именами каталогов, схем и томов для тома каталога Unity.

Создание таблицы из файлов в хранилище объектов

Delta Live Tables поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См . параметры формата данных.

Декоратор @dlt.table сообщает Delta Live Table создать таблицу, содержащую результат DataFrame возвращаемой функцией. @dlt.table Добавьте декоратор перед любым определением функции Python, возвращающим кадр данных Spark, чтобы зарегистрировать новую таблицу в разностных динамических таблицах. В следующем примере показано использование имени функции в качестве имени таблицы и добавление описательного комментария в таблицу:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Добавление таблицы из набора данных вышестоящий в конвейере

Вы можете использовать dlt.read() для чтения данных из других наборов данных, объявленных в текущем конвейере Delta Live Tables. Объявление новых таблиц таким образом создает зависимость, которую Delta Live Tables автоматически разрешает перед выполнением обновлений. Следующий код также содержит примеры мониторинга и обеспечения качества данных с ожидаемыми ожиданиями. См. статью Управление качеством данных с помощью Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Создание таблицы с обогащенными представлениями данных

Так как разностные динамические таблицы обрабатывают обновления конвейеров в виде ряда граф зависимостей, вы можете объявить высокообогащенные представления, которые могут создавать панели мониторинга, бизнес-аналитику и аналитику, объявляя таблицы с определенной бизнес-логикой.

Таблицы в разностных динамических таблицах эквивалентны концептуально материализованным представлениям. В то время как традиционные представления логики выполнения Spark каждый раз при запросе представления, таблица Delta Live Table хранит последнюю версию результатов запроса в файлах данных. Так как Delta Live Tables управляет обновлениями для всех наборов данных в конвейере, вы можете запланировать обновления конвейера для соответствия требованиям задержки для материализованных представлений и знать, что запросы к этим таблицам содержат самую последнюю версию доступных данных.

Таблица, определенная в следующем коде, демонстрирует концептуальное сходство с материализованным представлением, производным от вышестоящий данных в конвейере:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Сведения о настройке конвейера, использующего записную книжку, см. в статье "Создание конвейера".

Реализация конвейера разностных динамических таблиц с помощью SQL

Databricks рекомендует delta Live Tables с SQL в качестве предпочтительного способа создания новых конвейеров ETL, приема и преобразования в Azure Databricks. Интерфейс SQL для разностных динамических таблиц расширяет стандартный sql Spark с множеством новых ключевое слово, конструкций и табличного значения функций. Эти дополнения к стандартному SQL позволяют пользователям объявлять зависимости между наборами данных и развертывать инфраструктуру рабочего класса без обучения новых инструментов или дополнительных концепций.

Для пользователей, знакомых с Кадрами данных Spark и которым нужна поддержка более обширного тестирования и операций, которые сложно реализовать с помощью SQL, например операции метапрограммирования, Databricks рекомендует использовать интерфейс Python. См . пример: прием и обработка данных о именах детей в Нью-Йорке.

Скачивание данных

Чтобы получить данные для этого примера, скопируйте следующий код, вставьте его в новую записную книжку и запустите записную книжку. Дополнительные сведения о создании записных книжек см. в статье "Создание записной книжки".

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Замените <catalog-name>, <schema-name>а также <volume-name> именами каталогов, схем и томов для тома каталога Unity.

Создание таблицы из файлов в каталоге Unity

Для остальной части этого примера скопируйте следующие фрагменты КОДА SQL и вставьте их в новую записную книжку SQL, отдельно от записной книжки в предыдущем разделе. Каждый пример фрагмента КОДА SQL следует добавить в свою ячейку в записной книжке в описанном порядке.

Delta Live Tables поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См . параметры формата данных.

Все инструкции SQL Delta Live Table используют CREATE OR REFRESH синтаксис и семантику. При обновлении конвейера Разностные динамические таблицы определяют, можно ли выполнить логически правильный результат для таблицы путем добавочной обработки или при необходимости полной повторной компиляции.

В следующем примере создается таблица, загружая данные из CSV-файла, хранящегося в томе каталога Unity:

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Замените <catalog-name>, <schema-name>а также <volume-name> именами каталогов, схем и томов для тома каталога Unity.

Добавление таблицы из набора данных вышестоящий в конвейер

Виртуальную схему live можно использовать для запроса данных из других наборов данных, объявленных в текущем конвейере Delta Live Tables. Объявление новых таблиц таким образом создает зависимость, которую Delta Live Tables автоматически разрешает перед выполнением обновлений. Схема live — это пользовательская ключевое слово, реализованная в разностных динамических таблицах, которые можно заменить на целевую схему, если вы хотите опубликовать наборы данных. См. раздел "Использование каталога Unity" с конвейерами разностных динамических таблиц и публикацией данных из разностных динамических таблиц в хранилище метаданных Hive.

Следующий код также содержит примеры мониторинга и обеспечения качества данных с ожидаемыми ожиданиями. См. статью Управление качеством данных с помощью Delta Live Tables.

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Создание расширенного представления данных

Так как разностные динамические таблицы обрабатывают обновления конвейеров в виде ряда граф зависимостей, вы можете объявить высокообогащенные представления, которые могут создавать панели мониторинга, бизнес-аналитику и аналитику, объявляя таблицы с определенной бизнес-логикой.

Динамические таблицы эквивалентны концептуально материализованным представлениям. В то время как традиционные представления логики выполнения Spark каждый раз, когда представление запрашивается, динамические таблицы хранят самую последнюю версию запроса в файлах данных. Так как Delta Live Tables управляет обновлениями для всех наборов данных в конвейере, вы можете запланировать обновления конвейера для соответствия требованиям задержки для материализованных представлений и знать, что запросы к этим таблицам содержат самую последнюю версию доступных данных.

Следующий код создает обогащенное материализованное представление данных вышестоящий:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Чтобы настроить конвейер, использующий записную книжку, перейдите к созданию конвейера.

Создание конвейера

Delta Live Table создает конвейеры путем разрешения зависимостей, определенных в записных книжках или файлах (называемых исходным кодом или библиотеками) с помощью синтаксиса Delta Live Table. Каждый файл исходного кода может содержать только один язык, но в конвейере можно смешивать библиотеки разных языков.

  1. Щелкните разностные динамические таблицы на боковой панели и нажмите кнопку "Создать конвейер".
  2. Присвойте конвейеру имя.
  3. (Необязательно) Выберите поле "Бессерверный проверка", чтобы использовать полностью управляемые вычисления для этого конвейера. При выборе бессерверных параметров вычислений удаляются из пользовательского интерфейса.
  4. (Необязательно) Выберите выпуск продукта.
  5. Выберите Активировано для режима конвейера.
  6. Настройте одну или несколько записных книжек, содержащих исходный код для конвейера. В текстовом поле "Пути" введите путь к записной книжке или щелкните, Значок средства выбора файлов чтобы выбрать записную книжку.
  7. Выберите место назначения для наборов данных, опубликованных конвейером, хранилища метаданных Hive или каталога Unity. См. статью "Публикация наборов данных".
    • Хранилище метаданных Hive:
      • (Необязательно) Введите служба хранилища расположение выходных данных из конвейера. Система использует расположение по умолчанию, если оставить служба хранилища расположение пустым.
      • (Необязательно) Укажите целевую схему для публикации набора данных в хранилище метаданных Hive.
    • Каталог Unity: укажите каталог и целевую схемудля публикации набора данных в каталоге Unity.
  8. (Необязательно) Если вы не выбрали бессерверные параметры, можно настроить параметры вычислений для конвейера. Дополнительные сведения о параметрах вычислений см. в разделе "Настройка параметров конвейера" для разностных динамических таблиц.
  9. (Необязательно) Нажмите кнопку "Добавить уведомление" , чтобы настроить один или несколько адресов электронной почты для получения уведомлений о событиях конвейера. Дополнительные сведения о событиях конвейера см. в разделе "Добавление Уведомления по электронной почте".
  10. (Необязательно) Настройте дополнительные параметры для конвейера. Дополнительные сведения о параметрах см. в разделе "Настройка параметров конвейера" для разностных динамических таблиц.
  11. Нажмите кнопку Создать.

В системе отобразится страница Сведения о конвейере после нажатия кнопки Создать. Вы также можете получить доступ к конвейеру, щелкнув имя конвейера на вкладке Разностные динамические таблицы.

Запуск обновления конвейера

Чтобы запустить обновление конвейера, нажмите Значок запуска разностных динамических таблиц кнопку на верхней панели. Система возвращает сообщение с подтверждением запуска конвейера.

После успешного запуска обновления система разностных динамических таблиц выполняет следующие действия.

  1. Запускает кластер с использованием конфигурации кластера, созданной системой динамических разностных таблиц. Вы также можете указать пользовательскую конфигурацию кластера.
  2. Создает все отсутствующие таблицы и обеспечивает правильность схемы для всех существующих таблиц.
  3. Добавляет в таблицы новые доступные данные.
  4. Завершает работу кластера после завершения обновления.

Примечание.

По умолчанию для режима выполнения задано значение Production , которое развертывает временные вычислительные ресурсы для каждого обновления. Режим разработки можно использовать для изменения этого поведения, что позволяет использовать одни и те же вычислительные ресурсы для нескольких обновлений конвейера во время разработки и тестирования. См. раздел Режим разработки и рабочий режим.

Публикация наборов данных

Можно сделать наборы данных Delta Live Tables доступными для запроса, публикуя таблицы в хранилище метаданных Hive или каталог Unity. Если вы не указываете целевой объект для публикации данных, таблицы, созданные в конвейерах Delta Live Tables, могут быть доступны только другими операциями в том же конвейере. См. статью "Публикация данных из разностных динамических таблиц" в хранилище метаданных Hive и использование каталога Unity с конвейерами Delta Live Tables.

Примеры записных книжек исходного кода

Эти записные книжки можно импортировать в рабочую область Azure Databricks и использовать их для развертывания конвейера Delta Live Tables. См. раздел Создание конвейера.

Начало работы с записной книжкой Python для Delta Live Tables

Получить записную книжку

Начало работы с записной книжкой SQL для Delta Live Tables

Получить записную книжку

Примеры записных книжек исходного кода для рабочих областей без каталога Unity

Эти записные книжки можно импортировать в рабочую область Azure Databricks без включения каталога Unity и использовать их для развертывания конвейера Delta Live Tables. См. раздел Создание конвейера.

Начало работы с записной книжкой Python для Delta Live Tables

Получить записную книжку

Начало работы с записной книжкой SQL для Delta Live Tables

Получить записную книжку