Руководство. Запуск первого конвейера разностных динамических таблиц
В этом руководстве показано, как настроить конвейер Delta Live Tables из кода в записной книжке Databricks и запустить конвейер, активировав обновление конвейера. В этом руководстве приведен пример конвейера для приема и обработки примера набора данных с примером кода с помощью интерфейсов Python и SQL . Инструкции, описанные в этом руководстве, также можно использовать для создания конвейера с любыми записными книжками с правильно определенным синтаксисом Delta Live Table.
Можно настроить конвейеры и триггеры конвейеров Delta Live Tables с помощью пользовательского интерфейса рабочей области Azure Databricks или автоматических средств, таких как API, CLI, Наборы ресурсов Databricks или как задача в рабочем процессе Databricks. Чтобы ознакомиться с функциями и функциями Delta Live Tables, Databricks рекомендует сначала использовать пользовательский интерфейс для создания и запуска конвейеров. Кроме того, при настройке конвейера в пользовательском интерфейсе разностные динамические таблицы создают конфигурацию JSON для конвейера, который можно использовать для реализации программных рабочих процессов.
Чтобы продемонстрировать функциональные возможности Delta Live Tables, в примерах этого руководства скачайте общедоступный набор данных. Однако Databricks имеет несколько способов подключения к источникам данных и приема данных, которые конвейеры, реализующие реальные варианты использования, будут использоваться. См . сведения о приеме данных с помощью разностных динамических таблиц.
Требования
Чтобы запустить конвейер, необходимо иметь разрешение на создание кластера или доступ к политике кластера, определяющей кластер Delta Live Tables. Перед запуском конвейера среда выполнения Delta Live Tables создает кластер и завершается ошибкой, если необходимые разрешения отсутствуют.
Чтобы использовать примеры, приведенные в этом руководстве, рабочая область должна включать каталог Unity.
У вас должны быть следующие разрешения в каталоге Unity:
READ VOLUME
иWRITE VOLUME
, илиALL PRIVILEGES
, для томаmy-volume
.USE SCHEMA
илиALL PRIVILEGES
дляdefault
схемы.USE CATALOG
илиALL PRIVILEGES
дляmain
каталога.
Чтобы задать эти разрешения, ознакомьтесь с правами администратора Databricks или каталога Unity и защищаемыми объектами.
Примеры, приведенные в этом руководстве, используют том каталога Unity для хранения примеров данных. Чтобы использовать эти примеры, создайте том и используйте каталог, схему и имена томов, чтобы задать путь тома, используемый примерами.
Примечание.
Если в рабочей области нет включенного каталога Unity, записные книжки с примерами, для которых не требуется каталог Unity, присоединены к этой статье. Чтобы использовать эти примеры, выберите Hive metastore
в качестве параметра хранилища при создании конвейера.
Где выполняются запросы Delta Live Tables?
Запросы Delta Live Tables в основном реализуются в записных книжках Databricks, но разностные динамические таблицы не предназначены для интерактивного выполнения в ячейках записных книжек. Выполнение ячейки, содержащей синтаксис Delta Live Table в записной книжке Databricks, приводит к ошибке. Чтобы выполнить запросы, необходимо настроить записные книжки в рамках конвейера.
Внимание
- Вы не можете полагаться на порядок выполнения записных книжек по ячейкам при написании запросов для разностных динамических таблиц. Delta Live Tables вычисляет и запускает весь код, определенный в записных книжках, но имеет другую модель выполнения, чем записная книжка Run all command.
- Нельзя смешивать языки в одном файле исходного кода разностных динамических таблиц. Например, записная книжка может содержать только запросы Python или SQL-запросы. Если в конвейере необходимо использовать несколько языков, используйте несколько записных книжек или файлов в конвейере.
Вы также можете использовать код Python, хранящийся в файлах. Например, можно создать модуль Python, который можно импортировать в конвейеры Python или определить определяемые пользователем функции Python для использования в запросах SQL. Дополнительные сведения об импорте модулей Python см. в статье "Импорт модулей Python" из папок Git или файлов рабочей области. Дополнительные сведения об использовании определяемых пользователем функций Python см. в статье о определяемых пользователем скалярных функциях — Python.
Пример: прием и обработка данных о именах детей в Нью-Йорке
В примере в этой статье используется общедоступный набор данных, содержащий записи имен ребенка штата Нью-Йорк. В этих примерах демонстрируется использование конвейера "Динамические таблицы Delta Live Tables" для:
- Чтение необработанных CSV-данных из общедоступного набора данных в таблицу.
- Чтение записей из таблицы необработанных данных и использование ожидаемых данных Delta Live Tables для создания новой таблицы, содержащей очищенные данные.
- Используйте очищенные записи в качестве входных данных в запросы Delta Live Tables, которые создают производные наборы данных.
Этот код демонстрирует упрощенный пример архитектуры медальона. См. статью "Что такое архитектура медальона lakehouse?".
Реализации этого примера предоставляются для интерфейсов Python и SQL . Вы можете выполнить действия по созданию записных книжек, содержащих пример кода, или перейти к созданию конвейера и использовать одну из записных книжек, указанных на этой странице.
Реализация конвейера разностных динамических таблиц с помощью Python
Код Python, создающий наборы данных Delta Live Tables, должен возвращать кадры данных. Для пользователей, незнакомых с Python и DataFrames, Databricks рекомендует использовать интерфейс SQL. См. статью "Реализация конвейера разностных динамических таблиц" с помощью SQL.
В модуле dlt
реализованы все API Python для разностных динамических таблиц. Код конвейера Delta Live Tables, реализованный с помощью Python, должен явно импортировать dlt
модуль в верхней части записных книжек и файлов Python. Разностные динамические таблицы отличаются от многих скриптов Python ключевым способом: вы не вызываете функции, которые выполняют прием и преобразование данных для создания наборов данных Delta Live Tables. Вместо этого Delta Live Tables интерпретирует функции декоратора из dlt
модуля во всех файлах, загруженных в конвейер, и создает граф потока данных.
Чтобы реализовать пример в этом руководстве, скопируйте и вставьте следующий код Python в новую записную книжку Python. Добавьте каждый пример фрагмента кода в свою ячейку в записной книжке в описанном порядке. Дополнительные сведения о создании записных книжек см. в статье "Создание записной книжки".
При создании конвейера с интерфейсом Python по умолчанию имена таблиц определяются именами функций. Например, в следующем примере Python создаются три таблицы с именем baby_names_raw
, baby_names_prepared
и top_baby_names_2021
. Имя таблицы можно переопределить с помощью name
параметра. См. статью "Создание материализованного представления разностных динамических таблиц" или потоковой таблицы.
Внимание
Чтобы избежать непредвиденного поведения при запуске конвейера, не включайте код, который может иметь побочные эффекты в функциях, определяющих наборы данных. Дополнительные сведения см. в справочнике по Python.
Импорт модуля Delta Live Tables
В модуле dlt
реализованы все API Python для разностных динамических таблиц. Явно импортируйте dlt
модуль в верхней части записных книжек и файлов Python.
В следующем примере показан этот импорт, а также операторы импорта для pyspark.sql.functions
.
import dlt
from pyspark.sql.functions import *
Скачивание данных
Чтобы получить данные в этом примере, вы скачайте CSV-файл и сохраните его в томе следующим образом:
import os
os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"
dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")
Замените <catalog-name>
, <schema-name>
а также <volume-name>
именами каталогов, схем и томов для тома каталога Unity.
Создание таблицы из файлов в хранилище объектов
Delta Live Tables поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См . параметры формата данных.
Декоратор @dlt.table
сообщает Delta Live Table создать таблицу, содержащую результат DataFrame
возвращаемой функцией. @dlt.table
Добавьте декоратор перед любым определением функции Python, возвращающим кадр данных Spark, чтобы зарегистрировать новую таблицу в разностных динамических таблицах. В следующем примере показано использование имени функции в качестве имени таблицы и добавление описательного комментария в таблицу:
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
Добавление таблицы из вышестоящего набора данных в конвейере
Вы можете использовать dlt.read()
для чтения данных из других наборов данных, объявленных в текущем конвейере Delta Live Tables. Объявление новых таблиц таким образом создает зависимость, которую Delta Live Tables автоматически разрешает перед выполнением обновлений. Следующий код также содержит примеры мониторинга и обеспечения качества данных с ожидаемыми ожиданиями. См. статью Управление качеством данных с помощью Delta Live Tables.
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
dlt.read("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
Создание таблицы с обогащенными представлениями данных
Так как разностные динамические таблицы обрабатывают обновления конвейеров в виде ряда граф зависимостей, вы можете объявить высокообогащенные представления, которые могут создавать панели мониторинга, бизнес-аналитику и аналитику, объявляя таблицы с определенной бизнес-логикой.
Таблицы в разностных динамических таблицах эквивалентны концептуально материализованным представлениям. В отличие от традиционных представлений в Spark, которые выполняют логику при каждом запросе представления, таблица Delta Live Table сохраняет последнюю версию результатов запроса в файлах данных. Так как Delta Live Tables управляет обновлениями для всех наборов данных в конвейере, вы можете запланировать обновления конвейера для соответствия требованиям задержки для материализованных представлений и знать, что запросы к этим таблицам содержат самую последнюю версию доступных данных.
Таблица, определенная в следующем коде, демонстрирует концептуальное сходство с материализованным представлением, производным от вышестоящих данных в конвейере:
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
dlt.read("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
Сведения о настройке конвейера, использующего записную книжку, см. в статье "Создание конвейера".
Реализация конвейера разностных динамических таблиц с помощью SQL
Databricks рекомендует delta Live Tables с SQL в качестве предпочтительного способа создания новых конвейеров ETL, приема и преобразования в Azure Databricks. Интерфейс SQL для разностных динамических таблиц расширяет стандартный sql Spark с множеством новых ключевых слов, конструкций и табличного значения функций. Эти дополнения к стандартному SQL позволяют пользователям объявлять зависимости между наборами данных и развертывать инфраструктуру рабочего класса без обучения новых инструментов или дополнительных концепций.
Для пользователей, знакомых с Кадрами данных Spark и которым нужна поддержка более обширного тестирования и операций, которые сложно реализовать с помощью SQL, например операции метапрограммирования, Databricks рекомендует использовать интерфейс Python. См. статью "Реализация конвейера разностных динамических таблиц" с помощью Python.
Скачивание данных
Чтобы получить данные для этого примера, скопируйте следующий код, вставьте его в новую записную книжку и запустите записную книжку. Дополнительные сведения о создании записных книжек см. в статье "Создание записной книжки".
%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
Замените <catalog-name>
, <schema-name>
а также <volume-name>
именами каталогов, схем и томов для тома каталога Unity.
Создание таблицы из файлов в каталоге Unity
Для остальной части этого примера скопируйте следующие фрагменты КОДА SQL и вставьте их в новую записную книжку SQL, отдельно от записной книжки в предыдущем разделе. Добавьте каждый пример фрагмента КОДА SQL в свою ячейку в записной книжке в описанном порядке.
Delta Live Tables поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См . параметры формата данных.
Все инструкции SQL Delta Live Table используют CREATE OR REFRESH
синтаксис и семантику. При обновлении конвейера Разностные динамические таблицы определяют, можно ли выполнить логически правильный результат для таблицы путем добавочной обработки или при необходимости полной повторной компиляции.
В следующем примере создается таблица, загружая данные из CSV-файла, хранящегося в томе каталога Unity:
CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
'/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
Замените <catalog-name>
, <schema-name>
а также <volume-name>
именами каталогов, схем и томов для тома каталога Unity.
Добавление таблицы из вышестоящего набора данных в конвейер
Виртуальную схему live
можно использовать для запроса данных из других наборов данных, объявленных в текущем конвейере Delta Live Tables. Объявление новых таблиц таким образом создает зависимость, которую Delta Live Tables автоматически разрешает перед выполнением обновлений. Схема live
— это пользовательское ключевое слово, реализованное в разностных динамических таблицах, которое можно заменить на целевую схему, если вы хотите опубликовать наборы данных. См. раздел "Использование каталога Unity" с конвейерами разностных динамических таблиц и публикацией данных из разностных динамических таблиц в хранилище метаданных Hive.
Следующий код также содержит примеры мониторинга и обеспечения качества данных с ожидаемыми ожиданиями. См. статью Управление качеством данных с помощью Delta Live Tables.
CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM live.baby_names_sql_raw;
Создание расширенного представления данных
Так как разностные динамические таблицы обрабатывают обновления конвейеров в виде ряда граф зависимостей, вы можете объявить высокообогащенные представления, которые могут создавать панели мониторинга, бизнес-аналитику и аналитику, объявляя таблицы с определенной бизнес-логикой.
Следующий запрос использует материализованное представление для создания обогащенного представления из вышестоящих данных. В отличие от традиционных представлений в Spark, которые выполняют логику при каждом запросе представления, материализованные представления хранят самую последнюю версию запроса в файлах данных. Так как Delta Live Tables управляет обновлениями для всех наборов данных в конвейере, вы можете запланировать обновления конвейера для соответствия требованиям задержки для материализованных представлений и знать, что запросы к этим таблицам содержат самую последнюю версию доступных данных.
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Чтобы настроить конвейер, использующий записную книжку, перейдите к созданию конвейера.
Создание конвейера
Примечание.
- Так как вычислительные ресурсы полностью управляются для бессерверных конвейеров DLT, параметры вычислений недоступны при выборе бессерверных для конвейера.
- Сведения о возможности и включении бессерверных конвейеров DLT см. в разделе "Включение бессерверных вычислений".
Delta Live Table создает конвейеры путем разрешения зависимостей, определенных в записных книжках или файлах (называемых исходным кодом или библиотеками) с помощью синтаксиса Delta Live Table. Каждый файл исходного кода может содержать только один язык, но в конвейере можно смешивать библиотеки разных языков.
- Щелкните разностные динамические таблицы на боковой панели и нажмите кнопку "Создать конвейер".
- Присвойте конвейеру имя.
- (Необязательно) Чтобы запустить конвейер с помощью бессерверных конвейеров DLT, установите флажок без сервера. При выборе бессерверных параметров вычислений удаляются из пользовательского интерфейса. См. статью "Создание полностью управляемых конвейеров с помощью разностных динамических таблиц с бессерверными вычислениями".
- (Необязательно) Выберите выпуск продукта.
- Выберите Активировано для режима конвейера.
- Настройте одну или несколько записных книжек, содержащих исходный код для конвейера. В текстовом поле "Пути" введите путь к записной книжке или щелкните, чтобы выбрать записную книжку.
- Выберите место назначения для наборов данных, опубликованных конвейером, хранилища метаданных Hive или каталога Unity. См. статью "Публикация наборов данных".
- Хранилище метаданных Hive:
- (Необязательно) Введите расположение хранилища для выходных данных из конвейера. Система использует расположение по умолчанию, если оставить место хранения пустым.
- (Необязательно) Укажите целевую схему для публикации набора данных в хранилище метаданных Hive.
- Каталог Unity: укажите каталог и целевую схему для публикации набора данных в каталоге Unity.
- Хранилище метаданных Hive:
- (Необязательно) Если вы не выбрали бессерверные параметры, можно настроить параметры вычислений для конвейера. Дополнительные сведения о параметрах вычислений см. в разделе "Настройка параметров конвейера" для разностных динамических таблиц.
- (Необязательно) Нажмите кнопку "Добавить уведомление" , чтобы настроить один или несколько адресов электронной почты для получения уведомлений о событиях конвейера. Дополнительные сведения о событиях конвейера см. в разделе "Добавление Уведомления по электронной почте".
- (Необязательно) Настройте дополнительные параметры для конвейера. Дополнительные сведения о параметрах см. в разделе "Настройка параметров конвейера" для разностных динамических таблиц.
- Нажмите кнопку Создать.
После нажатия кнопки "Создать" появится страница сведений о конвейере. Вы также можете получить доступ к конвейеру, щелкнув имя конвейера на вкладке Разностные динамические таблицы.
Запуск обновления конвейера
Чтобы запустить обновление конвейера, нажмите кнопку на верхней панели. Система возвращает сообщение с подтверждением запуска конвейера.
После успешного запуска обновления система разностных динамических таблиц выполняет следующие действия.
- Запускает кластер с использованием конфигурации кластера, созданной системой динамических разностных таблиц. Вы также можете указать пользовательскую конфигурацию кластера.
- Создает все отсутствующие таблицы и обеспечивает правильность схемы для всех существующих таблиц.
- Добавляет в таблицы новые доступные данные.
- Завершает работу кластера после завершения обновления.
Примечание.
По умолчанию для режима выполнения задано значение Production , которое развертывает временные вычислительные ресурсы для каждого обновления. Режим разработки можно использовать для изменения этого поведения, что позволяет использовать одни и те же вычислительные ресурсы для нескольких обновлений конвейера во время разработки и тестирования. См. раздел Режим разработки и рабочий режим.
Публикация наборов данных
Можно сделать наборы данных Delta Live Tables доступными для запроса, публикуя таблицы в хранилище метаданных Hive или каталог Unity. Если вы не указываете целевой объект для публикации данных, таблицы, созданные в конвейерах Delta Live Tables, могут быть доступны только другими операциями в том же конвейере. См. статью "Публикация данных из разностных динамических таблиц" в хранилище метаданных Hive и использование каталога Unity с конвейерами Delta Live Tables.
Примеры записных книжек исходного кода
Эти записные книжки можно импортировать в рабочую область Azure Databricks и использовать их для развертывания конвейера Delta Live Tables. См. раздел Создание конвейера.
Начало работы с записной книжкой Python для Delta Live Tables
Начало работы с записной книжкой SQL для Delta Live Tables
Примеры записных книжек исходного кода для рабочих областей без каталога Unity
Эти записные книжки можно импортировать в рабочую область Azure Databricks без включения каталога Unity и использовать их для развертывания конвейера Delta Live Tables. См. раздел Создание конвейера.