Создание сквозного конвейера данных в Databricks

В этой статье показано, как создать и развернуть сквозной конвейер обработки данных, включая прием необработанных данных, преобразование данных и выполнение анализа обработанных данных.

Примечание.

Хотя в этой статье показано, как создать полный конвейер данных с помощью записных книжек Databricks и задания Azure Databricks для оркестрации рабочего процесса, Databricks рекомендует использовать Delta Live Tables, декларативный интерфейс для создания надежных, доступных для обслуживания и тестируемых конвейеров обработки данных.

Что такое конвейер данных

Конвейер данных реализует шаги, необходимые для перемещения данных из исходных систем, преобразования этих данных на основе требований и хранения данных в целевой системе. Конвейер данных включает все процессы, необходимые для преобразования необработанных данных в подготовленные данные, которые пользователи могут использовать. Например, конвейер данных может подготовить данные, чтобы аналитики и специалисты по обработке и анализу данных могли извлечь значение из данных с помощью анализа и отчетности.

Рабочий процесс извлечения, преобразования и загрузки (ETL) является общим примером конвейера данных. При обработке ETL данные передаются из исходных систем и записываются в промежуточную область, преобразуются на основе требований (обеспечение качества данных, дедупликации записей и т. д.), а затем записываются в целевую систему, например хранилище данных или озеро данных.

Шаги конвейера данных

Чтобы приступить к созданию конвейеров данных в Azure Databricks, в примере, приведенном в этой статье, описывается создание рабочего процесса обработки данных:

  • Используйте функции Azure Databricks для изучения необработанного набора данных.
  • Создайте записную книжку Databricks для приема необработанных исходных данных и записи необработанных данных в целевую таблицу.
  • Создайте записную книжку Databricks для преобразования необработанных исходных данных и записи преобразованных данных в целевую таблицу.
  • Создайте записную книжку Databricks для запроса преобразованных данных.
  • Автоматизируйте конвейер данных с помощью задания Azure Databricks.

Требования

Пример: набор данных "Миллион песен"

Набор данных, используемый в этом примере, представляет собой подмножество набора данных "Миллион песен", коллекцию функций и метаданных для современных музыкальных треков. Этот набор данных доступен в примерах наборов данных, включенных в рабочую область Azure Databricks.

Шаг 1. Создание кластера

Чтобы выполнить обработку и анализ данных в этом примере, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.

Примечание.

Так как в этом примере используется пример набора данных, хранящегося в DBFS, и рекомендуется сохранять таблицы в каталоге Unity, вы создадите кластер, настроенный с одним режимом доступа пользователей. Режим доступа с одним пользователем предоставляет полный доступ к DBFS, а также обеспечивает доступ к каталогу Unity. Ознакомьтесь с рекомендациями по каталогу DBFS и Unity.

  1. На боковой панели щелкните Вычислительная среда.
  2. На странице "Вычислительная среда" щелкните элемент Создать кластер.
  3. На странице "Новый кластер" введите уникальное имя кластера.
  4. В режиме доступа выберите один пользователь.
  5. В доступе к одному пользователю или субъекту-службе выберите имя пользователя.
  6. Оставьте оставшиеся значения в состоянии по умолчанию и нажмите кнопку "Создать кластер".

Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".

Шаг 2. Изучение исходных данных

Сведения об использовании интерфейса Azure Databricks для изучения необработанных исходных данных см. в статье "Изучение исходных данных для конвейера данных". Если вы хотите перейти непосредственно к приему и подготовке данных, перейдите к шагу 3. Прием необработанных данных.

Шаг 3. Прием необработанных данных

На этом шаге вы загружаете необработанные данные в таблицу, чтобы сделать ее доступной для дальнейшей обработки. Чтобы управлять ресурсами данных на платформе Databricks, например таблицами, Databricks рекомендует каталог Unity. Однако если у вас нет разрешений на создание необходимого каталога и схемы для публикации таблиц в каталоге Unity, вы по-прежнему можете выполнить следующие действия, публикуя таблицы в хранилище метаданных Hive.

Для приема данных Databricks рекомендует использовать автозагрузчик. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.

Вы можете настроить автозагрузчик для автоматического обнаружения схемы загруженных данных, позволяя инициализировать таблицы без явного объявления схемы данных и развивать схему таблицы в виде новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени. Databricks рекомендует вывод схемы при использовании автозагрузчика. Однако, как показано на этапе исследования данных, данные песен не содержат сведения о заголовке. Так как заголовок не хранится с данными, необходимо явно определить схему, как показано в следующем примере.

  1. На боковой панели нажмите кнопку "Создать" и выберите New Icon"Записная книжка" в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки, например Ingest songs data. По умолчанию:

    • Python — это выбранный язык.
    • Записная книжка подключена к последнему используемому кластеру. В этом случае кластер, созданный на шаге 1. Создание кластера.
  3. Введите следующую ячейку записной книжки:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Если вы используете каталог Unity, замените <table-name> на каталог, схему и имя таблицы для хранения записей приема (например, data_pipelines.songs_data.raw_song_data). В противном случае замените <table-name> именем таблицы, содержащей записи приема, например raw_song_data.

    Замените <checkpoint-path> путь к каталогу в DBFS для поддержания файлов проверка point, например/tmp/pipeline_get_started/_checkpoint/song_data.

  4. Щелкните Run Menuи выберите "Выполнить ячейку". В этом примере определяется схема данных с помощью сведений из READMEтаблицы, приема данных песен из всех файлов, содержащихся в file_path, и записывает данные в таблицу, указанную в table_nameней.

Шаг 4. Подготовка необработанных данных

Чтобы подготовить необработанные данные для анализа, выполните следующие действия, чтобы преобразовать необработанные данные песен, отфильтровав ненужные столбцы и добавив новое поле, содержащее метку времени для создания новой записи.

  1. На боковой панели нажмите кнопку "Создать" и выберите New Icon"Записная книжка" в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки. Например, Prepare songs data. Измените язык по умолчанию на SQL.

  3. Введите следующую ячейку записной книжки:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Если вы используете каталог Unity, замените <table-name> на каталог, схему и имя таблицы, чтобы содержать отфильтрованные и преобразованные записи (например, data_pipelines.songs_data.prepared_song_data). В противном случае замените <table-name> именем таблицы, содержащей отфильтрованные и преобразованные записи (например, prepared_song_data).

    Замените <raw-songs-table-name> именем таблицы, содержащей необработанные записи песен, которые были приема на предыдущем шаге.

  4. Щелкните Run Menuи выберите "Выполнить ячейку".

Шаг 5. Запрос преобразованных данных

На этом шаге вы расширяете конвейер обработки, добавляя запросы для анализа данных песен. Эти запросы используют подготовленные записи, созданные на предыдущем шаге.

  1. На боковой панели нажмите кнопку "Создать" и выберите New Icon"Записная книжка" в меню. Откроется диалоговое окно Создание записной книжки.

  2. Введите имя записной книжки. Например, Analyze songs data. Измените язык по умолчанию на SQL.

  3. Введите следующую ячейку записной книжки:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Замените <prepared-songs-table-name> именем таблицы, содержащей подготовленные данные. Например, data_pipelines.songs_data.prepared_song_data.

  4. Щелкните Down Caret в меню действий ячейки, выберите "Добавить ячейку ниже " и введите следующую команду в новой ячейке:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Замените <prepared-songs-table-name> именем подготовленной таблицы, созданной на предыдущем шаге. Например, data_pipelines.songs_data.prepared_song_data.

  5. Чтобы выполнить запросы и просмотреть выходные данные, нажмите кнопку "Выполнить все".

Шаг 6. Создание задания Azure Databricks для запуска конвейера

Вы можете создать рабочий процесс для автоматизации выполнения действий приема, обработки и анализа данных с помощью задания Azure Databricks.

  1. В рабочей области Обработка и анализ данных и инженерии выполните одно из следующих действий:
    • Щелкните Jobs Iconрабочие процессы на боковой панели и щелкните .Create Job Button
    • На боковой панели нажмите кнопку "Создать" и выберите New Icon"Задание".
  2. В диалоговом окне задачи на вкладке "Задачи " замените имя задания... именем задания. Например, "Рабочий процесс "Песни".
  3. В поле "Имя задачи" введите имя первой задачи, например Ingest_songs_data.
  4. Введите тип задачи Notebook.
  5. В поле "Источник" выберите "Рабочая область".
  6. Используйте браузер файлов, чтобы найти записную книжку приема данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
  7. В кластере выберите Shared_job_cluster или кластер, созданный на шаге Create a cluster .
  8. Нажмите кнопку Создать.
  9. Щелкните Add Task Button под только что созданной задачей и выберите "Записная книжка".
  10. В поле "Имя задачи" введите имя задачи, например Prepare_songs_data.
  11. Введите тип задачи Notebook.
  12. В поле "Источник" выберите "Рабочая область".
  13. Используйте браузер файлов для поиска записной книжки подготовки данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
  14. В кластере выберите Shared_job_cluster или кластер, созданный на шаге Create a cluster .
  15. Нажмите кнопку Создать.
  16. Щелкните Add Task Button под только что созданной задачей и выберите "Записная книжка".
  17. В поле "Имя задачи" введите имя задачи, например Analyze_songs_data.
  18. Введите тип задачи Notebook.
  19. В поле "Источник" выберите "Рабочая область".
  20. Используйте браузер файлов, чтобы найти записную книжку для анализа данных, щелкните имя записной книжки и нажмите кнопку "Подтвердить".
  21. В кластере выберите Shared_job_cluster или кластер, созданный на шаге Create a cluster .
  22. Нажмите кнопку Создать.
  23. Чтобы запустить рабочий процесс, нажмите кнопку Run Now Button. Чтобы просмотреть сведения о выполнении, щелкните ссылку в столбце "Время начала" для запуска в представлении выполнения задания. Щелкните каждую задачу, чтобы просмотреть сведения о выполнении задачи.
  24. Чтобы просмотреть результаты после завершения рабочего процесса, щелкните окончательную задачу анализа данных. Откроется страница вывода и отображается результаты запроса.

Шаг 7. Планирование задания конвейера данных

Примечание.

Чтобы продемонстрировать использование задания Azure Databricks для оркестрации запланированного рабочего процесса, этот пример начала разделяет этапы приема, подготовки и анализа на отдельные записные книжки, а затем каждая записная книжка используется для создания задачи в задании. Если все обработка содержится в одной записной книжке, вы можете легко запланировать записную книжку непосредственно из пользовательского интерфейса записной книжки Azure Databricks. См. статью "Создание запланированных заданий записной книжки и управление ими".

Обычное требование заключается в том, чтобы запустить конвейер данных на запланированной основе. Чтобы определить расписание для задания, запускающего конвейер:

  1. Щелкните Jobs Iconрабочие процессы на боковой панели.
  2. В столбце Имя нажмите на имя задания. На боковой панели отображаются Сведения о задании.
  3. Нажмите кнопку "Добавить триггер" на панели сведений о задании и выберите "Запланированный" в типе триггера.
  4. Укажите период, время начала и часовой пояс. При необходимости установите флажок Показать синтаксис cron, чтобы отобразить и изменить расписание в синтаксисе Quartz Cron.
  5. Нажмите кнопку Сохранить.

Подробнее