Поделиться через


Руководство по созданию конвейера ETL с помощью декларативных конвейеров Lakeflow

Узнайте, как создавать и развертывать конвейер ETL (извлечение, преобразование и загрузку) для оркестрации данных с помощью декларативных конвейеров Lakeflow и автозагрузчика. Конвейер ETL реализует шаги для чтения данных из исходных систем, преобразования этих данных на основе требований, таких как проверки качества данных и отмена дублирования данных, а также запись данных в целевую систему, например хранилище данных или озеро данных.

В этом руководстве вы будете использовать декларативные конвейеры Lakeflow и функцию автоматической загрузки:

  • Загрузка необработанных исходных данных в целевую таблицу.
  • Преобразование необработанных исходных данных и запись преобразованных данных в два целевых материализованных представления.
  • Запрос преобразованных данных.
  • Автоматизируйте конвейер ETL с помощью задачи в Databricks.

Дополнительные сведения о декларативных конвейерах Lakeflow и автозагрузчике см. в разделе "Декларативные конвейеры Lakeflow " и "Что такое автозагрузчик?

Требования

Чтобы завершить работу с этим руководством, необходимо выполнить следующие требования:

Сведения о наборе данных

Набор данных, используемый в этом примере, представляет собой подмножество набора данных "Миллион песен", коллекцию функций и метаданных для современных музыкальных треков. Этот набор данных доступен в примерах наборов данных, включенных в рабочую область Azure Databricks.

шаг 1. Создание конвейера

Сначала вы создадите конвейер ETL в системе Lakeflow Declarative Pipelines. Декларативные конвейеры Lakeflow создают конвейеры путем разрешения зависимостей, определенных в записных книжках или файлах (называемом исходным кодом) с помощью синтаксиса декларативного конвейера Lakeflow. Каждый файл исходного кода может содержать только один язык, но в конвейер можно добавить несколько записных книжек или файлов, относящихся к определенному языку. Дополнительные сведения см. в разделе "Декларативные конвейеры Lakeflow"

Это важно

Оставьте поле исходного кода пустым, чтобы создать и настроить записную книжку для создания исходного кода автоматически.

В этом руководстве используются бессерверные вычислительные ресурсы и каталог Unity. Для всех параметров конфигурации, которые не указаны, используйте параметры по умолчанию. Если бессерверные вычисления не включены или не поддерживаются в вашей рабочей области, вы можете выполнить учебник, как описано, с использованием настроек вычислений по умолчанию. Если вы используете параметры вычислений по умолчанию, необходимо вручную выбрать каталог Unity в разделе Параметры хранилища в секции Назначение пользовательского интерфейса Создать конвейер.

Чтобы создать новый конвейер ETL в Декларативных конвейерах Lakeflow, выполните следующие действия.

  1. В рабочей области щелкните на значок рабочих процессовЗадания и конвейеры на боковой панели.
  2. В меню Создать нажмите Конвейер ETL.
  3. В имени конвейера введите уникальное имя конвейера.
  4. Установите флажок "Бессерверный ".
  5. Чтобы настроить расположение Unity Catalog, где публикуются таблицы, выберите существующий каталог и введите новое имя в поле схема, чтобы создать новую схему в вашем каталоге.
  6. Нажмите кнопку "Создать".

Пользовательский интерфейс конвейера отображается для нового конвейера.

Шаг 2. Разработка конвейера

Это важно

Записные книжки могут содержать только один язык программирования. Не смешивайте код Python и SQL в записных книжках исходного кода потока данных.

На этом шаге вы будете использовать Записные книжки Databricks для разработки и проверки исходного кода для Декларативных конвейеров Lakeflow в интерактивном режиме.

В коде используется автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище. Дополнительные сведения см. в разделе "Что такое автозагрузчик"

Пустая записная книжка исходного кода создается автоматически и настроена для конвейера. Записная книжка создается в новом каталоге в пользовательском каталоге. Имя нового каталога и файла совпадает с именем конвейера. Например, /Users/someone@example.com/my_pipeline/my_pipeline.

При разработке конвейера можно выбрать Python или SQL. Примеры включены для обоих языков. В зависимости от выбранного языка убедитесь, что выбран язык записной книжки по умолчанию. Чтобы узнать больше о поддержке ноутбуков для разработки кода в Декларативных конвейерах Lakeflow, см. статью Разработка и отладка конвейеров ETL с помощью ноутбука в Декларативных конвейерах Lakeflow.

  1. Ссылка для доступа к этой записной книжке расположена в поле исходного кода на панели подробности о конвейере. Щелкните ссылку, чтобы открыть записную книжку, прежде чем перейти к следующему шагу.

  2. Нажмите кнопку "Подключиться" в правом верхнем углу, чтобы открыть меню конфигурации вычислений.

  3. Наведите указатель мыши на имя конвейера, созданного на шаге 1.

  4. Нажмите кнопку "Подключить".

  5. Рядом с заголовком записной книжки в верхней части выберите язык по умолчанию записной книжки (Python или SQL).

  6. Скопируйте и вставьте следующий код в ячейку записной книжки.

    Питон

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    
  7. Нажмите кнопку "Пуск" , чтобы начать обновление подключенного конвейера.

Шаг 3. Запрос преобразованных данных

На этом шаге вы будете запрашивать данные, обработанные в конвейере ETL, чтобы проанализировать данные песни. Эти запросы используют подготовленные записи, созданные на предыдущем шаге.

Сначала выполните запрос, который находит артистов, выпустивших наибольшее количество песен каждый год с 1990 года.

  1. На боковой панели щелкните иконку редактора SQLSQL Editor.

  2. Щелкните значок " и выберите " Создать запрос " в меню.

  3. Введите следующее:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Замените <catalog> и <schema> на имя каталога и схемы, в которых находится таблица. Например, data_pipelines.songs_data.top_artists_by_year.

  4. Нажмите Запустить выбранное.

Теперь выполните еще один запрос, который находит песни с 4/4 ударом и танцуемым темпом.

  1. Щелкните значок " и нажмите кнопку "Создать запрос " в меню.

  2. Введите следующий код:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Замените <catalog> и <schema> на имя каталога и схемы, в которых находится таблица. Например, data_pipelines.songs_data.songs_prepared.

  3. Нажмите Запустить выбранное.

Шаг 4. Создание задания для запуска конвейера

Затем создайте рабочий процесс для автоматизации приема, обработки и анализа данных с помощью задания Databricks.

  1. В рабочей области щелкните на значок рабочих процессовЗадания и конвейеры на боковой панели.
  2. В разделе "Новый" нажмите «Задание».
  3. В поле заголовка задачи замените дату и время< нового задания > именем задания. Например, Songs workflow.
  4. В поле "Имя задачи" введите имя первой задачи, например ETL_songs_data.
  5. В поле "Тип" выберите "Конвейер".
  6. В конвейере выберите конвейер, созданный на шаге 1.
  7. Нажмите кнопку "Создать".
  8. Чтобы запустить рабочий процесс, нажмите кнопку "Запустить сейчас". Чтобы просмотреть детали запуска, щелкните вкладку Запуски. Щелкните задачу, чтобы просмотреть детали выполнения задачи.
  9. Чтобы просмотреть результаты после завершения рабочего процесса, нажмите кнопку "Перейти к последнему успешному запуску " или " Время начала " для выполнения задания. Откроется страница вывода и отображается результаты запроса.

Дополнительные сведения о выполнении заданий см. в статье "Мониторинг и наблюдаемость заданий Lakeflow ".

Шаг 5. Планирование задачи конвейера

Чтобы запустить конвейер ETL по расписанию, выполните следующие действия.

  1. Перейдите к пользовательскому интерфейсу заданий и конвейеров в той же рабочей области Azure Databricks, что и задание.
  2. При необходимости выберите фильтры "Задания " и " Принадлежащие мне ".
  3. В столбце "Имя" щелкните имя задания. На боковой панели отображаются сведения о задании.
  4. Нажмите кнопку "Добавить триггер" на панели "Расписания и триггеры " и выберите "Запланированный " в типе триггера.
  5. Укажите период, время начала и часовой пояс.
  6. Нажмите кнопку "Сохранить".

Подробнее