Поделиться через


Руководство: Создание первого конвейера с помощью редактора конвейеров Lakeflow

Узнайте, как создать новый конвейер с помощью Декларативных конвейеров Lakeflow Spark (SDP) для оркестрации данных и Auto Loader. В этом руководстве рассматривается расширение примерного конвейера посредством очистки данных и создания запроса для нахождения 100 лучших пользователей.

В этом руководстве вы узнаете, как использовать редактор Конвейеров Lakeflow для:

  • Создайте конвейер со структурой папок по умолчанию и начните с набора примеров файлов.
  • Определите ограничения качества данных с помощью ожиданий.
  • Используйте функции редактора для расширения конвейера с новым преобразованием для выполнения анализа данных.

Требования

Перед началом работы с этим руководством необходимо выполнить следующие действия.

  • Войдите в рабочую область Azure Databricks.
  • Убедитесь, что каталог Unity включен для вашей рабочей области.
  • Укажите редактор конвейеров Lakeflow для рабочей области, и вы должны быть включены. См. Включение редактора конвейеров Lakeflow и обновленный мониторинг.
  • Разрешение на создание вычислительного ресурса или доступ к вычислительному ресурсу.
  • У вас есть разрешения на создание новой схемы в каталоге. Необходимые разрешения — это ALL PRIVILEGES или USE CATALOG и CREATE SCHEMA.

Шаг 1. Создание конвейера

На этом шаге вы создадите конвейер с помощью структуры папок по умолчанию и примеров кода. Примеры кода ссылались на таблицу users в wanderbricks примере источника данных.

  1. В рабочей области Azure Databricks щелкните значок , Новый, затем значок конвейера, Конвейер ETL. Откроется редактор конвейера на странице создания конвейера.

  2. Щелкните заголовок, чтобы указать имя конвейера.

  3. Под именем выберите каталог по умолчанию и схему для выходных таблиц. Они используются, если не указывать каталог и схему в определениях конвейера.

  4. В разделе "Далее" для конвейера щелкните значок схемы.Начните с примера кода в значке SQL или схемы.Начните с примера кода в Python на основе ваших языковых предпочтений. Это изменяет язык по умолчанию для примера кода, но вы можете добавить код на другом языке позже. Это создает структуру папок по умолчанию с примером кода, чтобы приступить к работе.

  5. Вы можете просмотреть пример кода в обозревателе активов конвейера, расположенном слева от рабочей области. Под transformations находятся два файла, каждый из которых создает один набор данных конвейера. В разделе explorations есть записная книжка с кодом, помогающая просматривать выходные данные конвейера. Щелкнув файл, вы можете просмотреть и изменить код в редакторе.

    Выходные наборы данных еще не созданы, а граф конвейера справа от экрана пуст.

  6. Чтобы запустить код конвейера (код в папке), нажмите кнопку transformations" в правой верхней части экрана.

    После завершения выполнения в нижней части рабочей области отображаются две новые таблицы, sample_users_<pipeline-name> и sample_aggregation_<pipeline-name>, которые были созданы. Вы также можете увидеть, что график конвейера в правой части рабочей области теперь отображает две таблицы, в том числе sample_users является источником для sample_aggregation.

Шаг 2. Применение проверок качества данных

На этом шаге вы добавите проверку качества данных в таблицу sample_users . Ожидания конвейера используются для ограничения данных. В этом случае вы удаляете все записи пользователя, у которых нет допустимого адреса электронной почты, и выводит чистую таблицу как users_cleaned.

  1. В браузере активов конвейера щелкните значок и выберите "Преобразование".

  2. В диалоговом окне создания файла преобразования сделайте следующее:

    • Выберите Python или SQL для языка. Это не обязательно соответствует предыдущему выбору.
    • Присвойте файлу имя. В этом случае выберите users_cleaned.
    • Для пути назначения оставьте значение по умолчанию.
    • Для типа набора данных оставьте его как "Нет" или выберите "Материализованное представление". Если выбрать материализованное представление, он создает пример кода для вас.
  3. В новом файле кода измените код следующим образом (используйте SQL или Python на основе выбора на предыдущем экране). Замените <pipeline-name> на полное имя для вашей таблицы sample_users.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Питон

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Нажмите кнопку "Запустить конвейер ", чтобы обновить конвейер. Теперь она должна иметь три таблицы.

Шаг 3. Анализ лучших пользователей

Затем получите 100 лучших пользователей по количеству созданных резервирований. Присоединяйте таблицу wanderbricks.bookings к материализованному представлению users_cleaned .

  1. В браузере активов конвейера щелкните значок и выберите "Преобразование".

  2. В диалоговом окне создания файла преобразования сделайте следующее:

    • Выберите Python или SQL для языка. Это не обязательно соответствует предыдущим выбранным параметрам.
    • Присвойте файлу имя. В этом случае выберите users_and_bookings.
    • Для пути назначения оставьте значение по умолчанию.
    • Для типа набора данных оставьте в значении Не выбрано.
  3. В новом файле кода измените код следующим образом (используйте SQL или Python на основе выбора на предыдущем экране).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Питон

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Нажмите кнопку "Запустить конвейер ", чтобы обновить наборы данных. После завершения выполнения вы сможете увидеть в графике конвейера четыре таблицы, включая новую таблицу users_and_bookings.

    Графическое представление конвейера с четырьмя таблицами

Дальнейшие шаги

Теперь, когда вы узнали, как использовать некоторые функции редактора конвейеров Lakeflow и создали конвейер, ознакомьтесь с другими функциями, чтобы узнать больше о следующих возможностях: