Руководство. Запуск сквозного конвейера аналитики Lakehouse

В этом руководстве показано, как настроить комплексный конвейер аналитики для озера Azure Databricks.

Внимание

В этом руководстве используются интерактивные записные книжки для выполнения распространенных задач извлечения, преобразования и загрузки в Python в кластерах с поддержкой каталога Unity. Если вы не используете каталог Unity, см. статью Выполнение первой рабочей нагрузки извлечения, преобразования и загрузки в Azure Databricks.

Задачи в этом руководстве

К концу этой статьи вы сможете свободно выполнять следующие операции:

  1. Запуск вычислительного кластера с поддержкой каталога Unity.
  2. Создание записной книжки Databricks.
  3. Запись и чтение данных из внешнего расположения каталога Unity.
  4. Настройка добавочного приема данных в таблицу каталога Unity с помощью автозагрузчика.
  5. Выполнение ячеек записной книжки для обработки, запроса и предварительного просмотра данных.
  6. Планирование записной книжки в качестве задания Databricks.
  7. Запрос к таблицам каталога Unity из Databricks SQL

Azure Databricks предоставляет набор готовых к использованию средств, позволяющих специалистам по обработке и анализу данных быстро разрабатывать и развертывать конвейеры извлечения, преобразования и загрузки (ETL). Каталог Unity позволяет администраторам данных настраивать и защищать учетные данные хранения, внешние расположения и объекты базы данных для пользователей по всей организации. Databricks SQL позволяет аналитикам выполнять SQL-запросы к тем же таблицам, которые используются в рабочих нагрузках ETL, что позволяет выполнять бизнес-аналитику в реальном времени в большом масштабе.

Вы также можете использовать разностные динамические таблицы для создания конвейеров ETL. Платформа Databricks создала разностные динамические таблицы, чтобы снизить сложность сборки, развертывания и обслуживания рабочих конвейеров ETL. См . руководство. Запуск первого конвейера live tables Delta Live Tables.

Требования

Примечание.

Если у вас нет прав управления кластером, вы по-прежнему можете выполнить большинство описанных ниже действий при наличии у вас доступа к кластеру.

Шаг 1. Создание кластера

Чтобы выполнить исследовательский анализ данных и инжиниринг данных, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.

  1. На боковой панели щелкните Значок вычисленийВычислительная среда.
  2. Нажмите кнопку Значок "Создать" на боковой панели и выберите "Кластер". Откроется страница создания кластера или вычислений.
  3. Укажите уникальное имя для кластера.
  4. Выберите переключатель Один узел.
  5. Выберите одного пользователя из раскрывающегося списка "Режим доступа".
  6. Убедитесь, что ваш адрес электронной почты отображается в поле "Один пользователь ".
  7. Выберите требуемую версию среды выполнения Databricks, 11.1 или более поздней, чтобы использовать каталог Unity.
  8. Нажмите кнопку "Создать вычисления" , чтобы создать кластер.

Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".

Шаг 2. Создание записной книжки Databricks

Чтобы приступить к написанию и выполнению интерактивного кода в Azure Databricks, создайте записную книжку.

  1. Нажмите кнопку Значок "Создать" на боковой панели и щелкните "Записная книжка".
  2. На странице создания записной книжки:
    • Укажите уникальное имя для записной книжки.
    • Убедитесь, что в качестве языка по умолчанию выбран Python.
    • Используйте раскрывающееся меню Подключение, чтобы выбрать кластер, созданный на шаге 1 в раскрывающемся списке кластера.

Записная книжка открывается с одной пустой ячейкой.

Дополнительные сведения о создании записных книжек и управлении ими см. в статье Управление записными книжками.

Шаг 3. Запись и чтение данных из внешнего расположения, управляемого каталогом Unity

В Databricks рекомендуется использовать Автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.

Используйте каталог Unity для управления безопасным доступом к внешним расположениям. Пользователи или субъекты-службы с разрешениями READ FILES для внешнего расположения могут использовать автозагрузчик для приема данных.

Обычно данные поступают во внешнее расположение из-за операций записи из других систем. В этой демонстрации можно имитировать поступление данных, записав JSON-файлы во внешнее расположение.

Скопируйте приведенный ниже код в ячейку записной книжки. Замените строковое значение catalog на имя каталога с разрешениями CREATE CATALOG и USE CATALOG. Замените строковое значение external_location на путь к внешнему расположению с разрешениями READ FILES, WRITE FILES и CREATE EXTERNAL TABLE.

Внешние расположения можно определить как весь контейнер хранилища, но часто указывают путь к каталогу, вложенному в контейнер.

Правильный формат пути к внешнему расположению: "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

При выполнении этой ячейки должна быть напечатана строка, которая считывает 12 байтов, печатает строку "Hello world!", и отображает все базы данных, представленные в каталоге. Если вы не можете получить эту ячейку для запуска, убедитесь, что вы находитесь в рабочей области с поддержкой каталога Unity, и запросите соответствующие разрешения от администратора рабочей области, чтобы выполнить работу с этим руководством.

Приведенный ниже код Python использует ваш адрес электронной почты для создания уникальной базы данных в указанном каталоге и уникальном расположении хранилища во внешнем расположении. При выполнении этой ячейки будут удалены все данные, связанные с этим руководством, что позволяет выполнять этот пример индемпотентно. Класс определяется и создается экземпляр, который будет использоваться для имитации пакетов данных, поступающих из подключенной системы к исходному внешнему расположению.

Скопируйте этот код в новую ячейку в записной книжке и выполните его, чтобы настроить среду.

Примечание.

Переменные, определенные в этом коде, должны позволить безопасно выполнять его без риска конфликта с существующими ресурсами рабочей области или другими пользователями. Ограниченные разрешения сети или хранилища приведут к ошибкам при выполнении этого кода; обратитесь к администратору рабочей области, чтобы решить проблему этих ограничений.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Теперь пакет данных можно приземлить, скопировать следующий код в ячейку и выполнить его. Эту ячейку можно выполнить вручную до 60 раз, чтобы активировать поступление новых данных.

RawData.land_batch()

Шаг 4. Настройка автозагрузчика для приема данных в каталог Unity

Databricks рекомендует хранить данные с использованием формата Delta Lake. Delta Lake — это уровень хранения открытого кода, который предоставляет транзакции ACID и обеспечивает гибридное решение "хранилище и озеро данных". Delta Lake — это формат по умолчанию для таблиц, созданных в Databricks.

Чтобы настроить автозагрузчик для приема данных в таблицу каталога Unity, скопируйте следующий код в пустую ячейку записной книжки:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Дополнительные сведения об автозагрузчике см. в статье Автозагрузчик.

Дополнительные сведения о структурированной потоковой передаче с помощью каталога Unity см. в разделе Использование каталога Unity с структурированной потоковой передачей.

Шаг 5. Обработка данных и взаимодействие с ними

Записные книжки выполняют логические ячейки последовательно. Выполните следующие действия, чтобы выполнить логику в ячейке:

  1. Чтобы запустить ячейку, выполненную на предыдущем шаге, выберите ячейку и нажмите клавиши SHIFT+ВВОД.

  2. Чтобы запросить только что созданную таблицу, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.

    df = spark.read.table(table_name)
    
  3. Чтобы просмотреть данные в только что созданной таблице, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.

    display(df)
    

Дополнительные сведения об интерактивных параметрах визуализации данных см. в разделе "Визуализации" в записных книжках Databricks.

Шаг 6. Планирование задания

Записные книжки Databricks можно запускать в качестве рабочих сценариев, добавляя их в виде задачи в задание Databricks. В этом шаге вы создадите новое задание, которое можно активировать вручную.

Чтобы запланировать выполнение записной книжки в качестве задачи, выполните следующие действия.

  1. Нажмите Расписание справа от строки заголовка.
  2. Введите уникальное Имя задания.
  3. Выберите Вручную.
  4. В раскрывающемся списке Кластер выберите кластер, созданный на шаге 1.
  5. Нажмите кнопку Создать.
  6. В открывшемся окне нажмите Запустить сейчас.
  7. Чтобы просмотреть результаты выполнения задания, щелкните Внешний канал значок рядом с меткой времени последнего выполнения .

Дополнительные сведения о заданиях см. в статье "Что такое задания Azure Databricks?".

Шаг 7. Запрос таблицы из Databricks SQL

Любой пользователь с USE CATALOG разрешением на текущий каталог, USE SCHEMA разрешение на текущую схему и SELECT разрешения на таблицу могут запрашивать содержимое таблицы из предпочтительного API Databricks.

Для выполнения запросов в Databricks SQL требуется доступ к работающему хранилищу SQL.

Таблица, созданная ранее в этом руководстве, называется target_table. Можно создать запросы к ней с помощью каталога, предоставленного в первой ячейке, и базы данных с шаблоном e2e_lakehouse_<your-username>. Вы можете использовать Обозреватель каталога для поиска созданных объектов данных.

Дополнительные интеграции

Дополнительные сведения об интеграции и средствах для инжиниринга данных с помощью Azure Databricks: