Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Узнайте, как создать новый конвейер с помощью Декларативных конвейеров Lakeflow Spark (SDP) для оркестрации данных и Auto Loader. В этом руководстве рассматривается расширение примерного конвейера посредством очистки данных и создания запроса для нахождения 100 лучших пользователей.
В этом руководстве вы узнаете, как использовать редактор Конвейеров Lakeflow для:
- Создайте конвейер со структурой папок по умолчанию и начните с набора примеров файлов.
- Определите ограничения качества данных с помощью ожиданий.
- Используйте функции редактора для расширения конвейера с новым преобразованием для выполнения анализа данных.
Требования
Перед началом работы с этим руководством необходимо выполнить следующие действия.
- Войдите в рабочую область Azure Databricks.
- Убедитесь, что каталог Unity включен для вашей рабочей области.
- Укажите редактор конвейеров Lakeflow для рабочей области, и вы должны быть включены. См. Включение редактора конвейеров Lakeflow и обновленный мониторинг.
- Разрешение на создание вычислительного ресурса или доступ к вычислительному ресурсу.
- У вас есть разрешения на создание новой схемы в каталоге. Необходимые разрешения — это
ALL PRIVILEGESилиUSE CATALOGиCREATE SCHEMA.
Шаг 1. Создание конвейера
На этом шаге вы создадите конвейер с помощью структуры папок по умолчанию и примеров кода. Примеры кода ссылались на таблицу users в wanderbricks примере источника данных.
В рабочей области Azure Databricks щелкните
, Новый, затем
, Конвейер ETL. Откроется редактор конвейера на странице создания конвейера.
Щелкните заголовок, чтобы указать имя конвейера.
Под именем выберите каталог по умолчанию и схему для выходных таблиц. Они используются, если не указывать каталог и схему в определениях конвейера.
В разделе "Далее" для конвейера щелкните
Начните с примера кода в значке SQL или
Начните с примера кода в Python на основе ваших языковых предпочтений. Это изменяет язык по умолчанию для примера кода, но вы можете добавить код на другом языке позже. Это создает структуру папок по умолчанию с примером кода, чтобы приступить к работе.
Вы можете просмотреть пример кода в обозревателе активов конвейера, расположенном слева от рабочей области. Под
transformationsнаходятся два файла, каждый из которых создает один набор данных конвейера. В разделеexplorationsесть записная книжка с кодом, помогающая просматривать выходные данные конвейера. Щелкнув файл, вы можете просмотреть и изменить код в редакторе.Выходные наборы данных еще не созданы, а граф конвейера справа от экрана пуст.
Чтобы запустить код конвейера (код в папке), нажмите кнопку
transformations" в правой верхней части экрана.После завершения выполнения в нижней части рабочей области отображаются две новые таблицы,
sample_users_<pipeline-name>иsample_aggregation_<pipeline-name>, которые были созданы. Вы также можете увидеть, что график конвейера в правой части рабочей области теперь отображает две таблицы, в том числеsample_usersявляется источником дляsample_aggregation.
Шаг 2. Применение проверок качества данных
На этом шаге вы добавите проверку качества данных в таблицу sample_users .
Ожидания конвейера используются для ограничения данных. В этом случае вы удаляете все записи пользователя, у которых нет допустимого адреса электронной почты, и выводит чистую таблицу как users_cleaned.
В браузере активов конвейера щелкните
и выберите "Преобразование".
В диалоговом окне создания файла преобразования сделайте следующее:
- Выберите Python или SQL для языка. Это не обязательно соответствует предыдущему выбору.
- Присвойте файлу имя. В этом случае выберите
users_cleaned. - Для пути назначения оставьте значение по умолчанию.
- Для типа набора данных оставьте его как "Нет" или выберите "Материализованное представление". Если выбрать материализованное представление, он создает пример кода для вас.
В новом файле кода измените код следующим образом (используйте SQL или Python на основе выбора на предыдущем экране). Замените
<pipeline-name>на полное имя для вашей таблицыsample_users.SQL
-- Drop all rows that do not have an email address CREATE MATERIALIZED VIEW users_cleaned ( CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW ) AS SELECT * FROM sample_users_<pipeline-name>;Питон
from pyspark import pipelines as dp # Drop all rows that do not have an email address @dp.table @dp.expect_or_drop("no null emails", "email IS NOT NULL") def users_cleaned(): return ( spark.read.table("sample_users_<pipeline_name>") )Нажмите кнопку "Запустить конвейер ", чтобы обновить конвейер. Теперь она должна иметь три таблицы.
Шаг 3. Анализ лучших пользователей
Затем получите 100 лучших пользователей по количеству созданных резервирований. Присоединяйте таблицу wanderbricks.bookings к материализованному представлению users_cleaned .
В браузере активов конвейера щелкните
и выберите "Преобразование".
В диалоговом окне создания файла преобразования сделайте следующее:
- Выберите Python или SQL для языка. Это не обязательно соответствует предыдущим выбранным параметрам.
- Присвойте файлу имя. В этом случае выберите
users_and_bookings. - Для пути назначения оставьте значение по умолчанию.
- Для типа набора данных оставьте в значении Не выбрано.
В новом файле кода измените код следующим образом (используйте SQL или Python на основе выбора на предыдущем экране).
SQL
-- Get the top 100 users by number of bookings CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS SELECT u.name AS name, COUNT(b.booking_id) AS booking_count FROM users_cleaned u JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id GROUP BY u.name ORDER BY booking_count DESC LIMIT 100;Питон
from pyspark import pipelines as dp from pyspark.sql.functions import col, count, desc # Get the top 100 users by number of bookings @dp.table def users_and_bookings(): return ( spark.read.table("users_cleaned") .join(spark.read.table("samples.wanderbricks.bookings"), "user_id") .groupBy(col("name")) .agg(count("booking_id").alias("booking_count")) .orderBy(desc("booking_count")) .limit(100) )Нажмите кнопку "Запустить конвейер ", чтобы обновить наборы данных. После завершения выполнения вы сможете увидеть в графике конвейера четыре таблицы, включая новую таблицу
users_and_bookings.
Дальнейшие шаги
Теперь, когда вы узнали, как использовать некоторые функции редактора конвейеров Lakeflow и создали конвейер, ознакомьтесь с другими функциями, чтобы узнать больше о следующих возможностях:
Средства для работы с преобразованиями и отладкой при создании конвейеров:
- Выборочное выполнение
- Предварительные версии данных
- Интерактивный DAG (граф наборов данных в вашем конвейере)
Встроенная интеграция пакетов активов Databricks для эффективной совместной работы, управления версиями и интеграции CI/CD непосредственно из редактора: