Руководство. Запуск сквозного конвейера аналитики Lakehouse
В этом руководстве показано, как настроить комплексный конвейер аналитики для озера Azure Databricks.
Внимание
В этом руководстве используются интерактивные записные книжки для выполнения распространенных задач извлечения, преобразования и загрузки в Python в кластерах с поддержкой каталога Unity. Если вы не используете каталог Unity, см. статью Выполнение первой рабочей нагрузки извлечения, преобразования и загрузки в Azure Databricks.
Задачи в этом руководстве
К концу этой статьи вы сможете свободно выполнять следующие операции:
- Запуск вычислительного кластера с поддержкой каталога Unity.
- Создание записной книжки Databricks.
- Запись и чтение данных из внешнего расположения каталога Unity.
- Настройка добавочного приема данных в таблицу каталога Unity с помощью автозагрузчика.
- Выполнение ячеек записной книжки для обработки, запроса и предварительного просмотра данных.
- Планирование записной книжки в качестве задания Databricks.
- Запрос к таблицам каталога Unity из Databricks SQL
Azure Databricks предоставляет набор готовых к использованию средств, позволяющих специалистам по обработке и анализу данных быстро разрабатывать и развертывать конвейеры извлечения, преобразования и загрузки (ETL). Каталог Unity позволяет администраторам данных настраивать и защищать учетные данные хранения, внешние расположения и объекты базы данных для пользователей по всей организации. Databricks SQL позволяет аналитикам выполнять SQL-запросы к тем же таблицам, которые используются в рабочих нагрузках ETL, что позволяет выполнять бизнес-аналитику в реальном времени в большом масштабе.
Вы также можете использовать разностные динамические таблицы для создания конвейеров ETL. Платформа Databricks создала разностные динамические таблицы, чтобы снизить сложность сборки, развертывания и обслуживания рабочих конвейеров ETL. См . руководство. Запуск первого конвейера live tables Delta Live Tables.
Требования
Примечание.
Если у вас нет прав управления кластером, вы по-прежнему можете выполнить большинство описанных ниже действий при наличии у вас доступа к кластеру.
Шаг 1. Создание кластера
Чтобы выполнить исследовательский анализ данных и инжиниринг данных, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.
- Щелкните "Вычисления" на боковой панели.
- Нажмите кнопку "Создать" на боковой панели и выберите "Кластер". Откроется страница создания кластера или вычислений.
- Укажите уникальное имя для кластера.
- Выберите переключатель Один узел.
- Выберите одного пользователя из раскрывающегося списка "Режим доступа".
- Убедитесь, что ваш адрес электронной почты отображается в поле "Один пользователь ".
- Выберите требуемую версию среды выполнения Databricks, 11.1 или более поздней, чтобы использовать каталог Unity.
- Нажмите кнопку "Создать вычисления" , чтобы создать кластер.
Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".
Шаг 2. Создание записной книжки Databricks
Чтобы создать записную книжку в рабочей области, нажмите кнопку "Создать" на боковой панели и нажмите кнопку "Записная книжка". Пустая записная книжка открывается в рабочей области.
Дополнительные сведения о создании записных книжек и управлении ими см. в статье Управление записными книжками.
Шаг 3. Запись и чтение данных из внешнего расположения, управляемого каталогом Unity
В Databricks рекомендуется использовать Автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.
Используйте каталог Unity для управления безопасным доступом к внешним расположениям. Пользователи или субъекты-службы с разрешениями READ FILES
для внешнего расположения могут использовать автозагрузчик для приема данных.
Обычно данные поступают во внешнее расположение из-за операций записи из других систем. В этой демонстрации можно имитировать поступление данных, записав JSON-файлы во внешнее расположение.
Скопируйте приведенный ниже код в ячейку записной книжки. Замените строковое значение catalog
на имя каталога с разрешениями CREATE CATALOG
и USE CATALOG
. Замените строковое значение external_location
на путь к внешнему расположению с разрешениями READ FILES
, WRITE FILES
и CREATE EXTERNAL TABLE
.
Внешние расположения можно определить как весь контейнер хранилища, но часто указывают путь к каталогу, вложенному в контейнер.
Правильный формат пути к внешнему расположению: "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
При выполнении этой ячейки должна быть напечатана строка, которая считывает 12 байтов, печатает строку "Hello world!", и отображает все базы данных, представленные в каталоге. Если вы не можете получить эту ячейку для запуска, убедитесь, что вы находитесь в рабочей области с поддержкой каталога Unity, и запросите соответствующие разрешения от администратора рабочей области, чтобы выполнить работу с этим руководством.
Приведенный ниже код Python использует ваш адрес электронной почты для создания уникальной базы данных в указанном каталоге и уникальном расположении хранилища во внешнем расположении. При выполнении этой ячейки будут удалены все данные, связанные с этим руководством, что позволяет выполнять этот пример индемпотентно. Класс определяется и создается экземпляр, который будет использоваться для имитации пакетов данных, поступающих из подключенной системы к исходному внешнему расположению.
Скопируйте этот код в новую ячейку в записной книжке и выполните его, чтобы настроить среду.
Примечание.
Переменные, определенные в этом коде, должны позволить безопасно выполнять его без риска конфликта с существующими ресурсами рабочей области или другими пользователями. Ограниченные разрешения сети или хранилища приведут к ошибкам при выполнении этого кода; обратитесь к администратору рабочей области, чтобы решить проблему этих ограничений.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Теперь пакет данных можно приземлить, скопировать следующий код в ячейку и выполнить его. Эту ячейку можно выполнить вручную до 60 раз, чтобы активировать поступление новых данных.
RawData.land_batch()
Шаг 4. Настройка автозагрузчика для приема данных в каталог Unity
Databricks рекомендует хранить данные с использованием формата Delta Lake. Delta Lake — это уровень хранения открытого кода, который предоставляет транзакции ACID и обеспечивает гибридное решение "хранилище и озеро данных". Delta Lake — это формат по умолчанию для таблиц, созданных в Databricks.
Чтобы настроить автозагрузчик для приема данных в таблицу каталога Unity, скопируйте следующий код в пустую ячейку записной книжки:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Дополнительные сведения об автозагрузчике см. в статье Автозагрузчик.
Дополнительные сведения о структурированной потоковой передаче с помощью каталога Unity см. в разделе Использование каталога Unity с структурированной потоковой передачей.
Шаг 5. Обработка данных и взаимодействие с ними
Записные книжки выполняют логические ячейки последовательно. Выполните следующие действия, чтобы выполнить логику в ячейке:
Чтобы запустить ячейку, выполненную на предыдущем шаге, выберите ячейку и нажмите клавиши SHIFT+ВВОД.
Чтобы запросить только что созданную таблицу, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.
df = spark.read.table(table)
Чтобы просмотреть данные в только что созданной таблице, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.
display(df)
Дополнительные сведения об интерактивных параметрах визуализации данных см. в разделе "Визуализации" в записных книжках Databricks.
Шаг 6. Планирование задания
Записные книжки Databricks можно запускать в качестве рабочих сценариев, добавляя их в виде задачи в задание Databricks. В этом шаге вы создадите новое задание, которое можно активировать вручную.
Чтобы запланировать выполнение записной книжки в качестве задачи, выполните следующие действия.
- Нажмите Расписание справа от строки заголовка.
- Введите уникальное Имя задания.
- Выберите Вручную.
- В раскрывающемся списке Кластер выберите кластер, созданный на шаге 1.
- Нажмите кнопку Создать.
- В открывшемся окне нажмите Запустить сейчас.
- Чтобы просмотреть результаты выполнения задания, щелкните значок рядом с меткой времени последнего выполнения .
Дополнительные сведения о заданиях см. в разделе "Что такое задания Databricks?".
Шаг 7. Запрос таблицы из Databricks SQL
Любой пользователь с USE CATALOG
разрешением на текущий каталог, USE SCHEMA
разрешение на текущую схему и SELECT
разрешения на таблицу могут запрашивать содержимое таблицы из предпочтительного API Databricks.
Для выполнения запросов в Databricks SQL требуется доступ к работающему хранилищу SQL.
Таблица, созданная ранее в этом руководстве, называется target_table
. Можно создать запросы к ней с помощью каталога, предоставленного в первой ячейке, и базы данных с шаблоном e2e_lakehouse_<your-username>
. Обозреватель каталогов можно использовать для поиска созданных объектов данных.
Дополнительные интеграции
Дополнительные сведения об интеграции и средствах для инжиниринга данных с помощью Azure Databricks: