Поделиться через


Разработка декларативных конвейеров данных Lakeflow Spark

Разработка и тестирование кода конвейера отличается от других рабочих нагрузок Apache Spark. В этой статье представлен обзор поддерживаемой функциональности, лучших практик и соображений при разработке кода конвейера. Чтобы ознакомиться с дополнительными рекомендациями и лучшими практиками, см. раздел "Применение лучших методов разработки программного обеспечения и DevOps к конвейерам".

Замечание

Чтобы проверить код или запустить обновление, необходимо добавить исходный код в конфигурацию конвейера. См. раздел "Настройка конвейеров".

Какие файлы допустимы для исходного кода конвейера?

Код конвейера может быть Python или SQL. Вы можете использовать сочетание файлов исходного кода Python и SQL, поддерживаемых одним конвейером, но каждый файл может содержать только один язык. См. статью "Разработка кода конвейера с помощью Python и разработка декларативного конвейера Spark Lakeflow с помощью SQL".

Исходные файлы для конвейеров хранятся в рабочей области. Файлы рабочей области представляют скрипты Python или SQL, созданные в редакторе Конвейеров Lakeflow. Вы также можете изменить файлы локально в предпочтительной интегрированной среде разработки и синхронизировать их с рабочей областью. Сведения о файлах в рабочей области см. в разделе "Что такое файлы рабочей области?". Сведения о редактировании с помощью редактора Конвейеров Lakeflow см. в разделе "Разработка и отладка конвейеров ETL" с помощью редактора конвейеров Lakeflow. Сведения о создании кода в локальной интегрированной среде разработки см. в статье "Разработка кода конвейера в вашей локальной среде разработки".

При разработке кода Python в качестве модулей или библиотек необходимо установить и импортировать код, а затем вызвать методы из файла Python, настроенного как исходный код. См. статью "Управление зависимостями Python" для конвейеров.

Замечание

Если вам нужно использовать произвольные команды SQL в записной книжке Python, можно использовать шаблон синтаксиса spark.sql("<QUERY>") для запуска SQL в качестве кода Python.

Функции каталога Unity позволяют регистрировать произвольные пользовательские функции Python для использования в SQL. См. функции, определяемые пользователем (UDF), в каталоге Unity.

Общие сведения о функциях разработки конвейеров

Конвейеры расширяют возможности использования множества функций разработки Azure Databricks, а также вводят новые функции и концепции. В следующей таблице представлен краткий обзор концепций и функций, поддерживающих разработку кода конвейера:

Функция Description
Режим разработки Интерактивный запуск конвейеров (выбирая обновление через редактор конвейеров Lakeflow) будет использовать режим разработки. Новые конвейеры запускаются с отключенным режимом разработки при автоматическом запуске с помощью расписания или автоматического триггера. См. режим разработки.
Сухой запуск Обновление сухого запуска проверяет правильность исходного кода конвейера без выполнения обновления в каких-либо таблицах. См. Проверка трубопровода на ошибки без ожидания обновления таблиц.
Редактор конвейеров Lakeflow Файлы Python и SQL, настроенные в качестве исходного кода для конвейеров, предоставляют интерактивные параметры проверки кода и выполнения обновлений. См. статью "Разработка и отладка конвейеров ETL" с помощью редактора конвейеров Lakeflow.
Параметры Используйте параметры в конфигурациях исходного кода и конвейера, чтобы упростить тестирование и расширяемость. См. раздел "Использование параметров с конвейерами".
Пакеты активов Databricks Пакеты ресурсов Databricks позволяют перемещать конфигурации конвейера и исходный код между рабочими областями. См. статью "Преобразование конвейера в проект пакета активов Databricks".

Создание примеров наборов данных для разработки и тестирования

Databricks рекомендует создавать наборы данных для разработки и тестирования, чтобы проверять логику конвейера с ожидаемыми данными и потенциально некорректными или поврежденными записями. Существует несколько способов создания наборов данных, которые могут быть полезны для разработки и тестирования, включая следующие:

  • Выберите подмножество данных из рабочего набора данных.
  • Используйте анонимные или искусственные созданные данные для источников, содержащих личные данные. Сведения о руководстве по созданию данных для тестирования с помощью faker библиотеки см. в руководстве по созданию конвейера ETL с помощью отслеживания измененных данных.
  • Создайте тестовые данные с четко определенными результатами, основываясь на логике последующих трансформаций.
  • Предвидите потенциальные повреждения данных, неправильные записи и изменения вышестоящих данных путем создания записей, которые нарушают ожидания схемы данных.

Например, если у вас есть файл, определяющий набор данных с помощью следующего кода:

CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
  "/production/data",
  format => "json")

Вы можете создать пример набора данных, содержащего подмножество записей с помощью запроса, как показано ниже.

CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

В следующем примере показано фильтрация опубликованных данных для создания подмножества рабочих данных для разработки или тестирования:

CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

Чтобы использовать эти различные наборы данных, создайте несколько конвейеров с исходным кодом, реализующим логику преобразования. Каждый конвейер может считывать данные из input_data набора данных, но настроен так, чтобы включать файл, который создаёт набор данных, относящийся к среде.

Как конвейерные наборы данных обрабатывают данные?

В следующей таблице описано, как материализованные представления, потоковые таблицы и представления обрабатывают данные.

Тип набора данных Как записи обрабатываются с помощью определенных запросов?
Потоковая таблица Каждая запись обрабатывается ровно один раз. Предполагается, что это источник, в который можно только добавлять данные.
Материализованное представление Записи обрабатываются по мере необходимости, чтобы получить точные результаты для текущего состояния данных. Материализованные представления должны использоваться для задач обработки данных, таких как преобразования, агрегаты или предварительные вычисления медленных запросов и часто используемые вычисления. Результаты кэшируются между обновлениями.
View При каждом запросе представления записи обрабатываются. Используйте представления для промежуточных преобразований и проверок качества данных, которые не должны публиковаться в общедоступных наборах данных.

Объявите ваши первые наборы данных в конвейерах

Пайплайны вводят новый синтаксис для Python и SQL. Сведения об основах синтаксиса конвейера см. в статье "Разработка кода конвейера с помощью Python и разработка декларативного конвейера Spark Lakeflow Spark с помощью SQL".

Замечание

Конвейерные линии отделяют определения наборов данных от обработки обновлений, а исходный код конвейера не предназначен для интерактивного выполнения.

Как настроить конвейеры?

Параметры конвейера делятся на две широкие категории:

  1. Конфигурации, определяющие коллекцию файлов (известных как исходный код), которые используют синтаксис пайплайна для объявления наборов данных.
  2. Конфигурации, управляющие инфраструктурой конвейера, управлением зависимостями, обработкой обновлений и сохранением таблиц в рабочей области.

Большинство конфигураций являются необязательными, но некоторые требуют внимательного внимания, особенно при настройке рабочих конвейеров. К ним относятся следующие:

  • Чтобы сделать данные доступными за пределами конвейера, необходимо объявить целевую схему для публикации в хранилище метаданных Hive или целевой каталог и целевую схему для публикации в каталоге Unity.
  • Разрешения доступа к данным настраиваются через кластер, используемый для выполнения. Убедитесь, что кластер имеет соответствующие разрешения, настроенные для источников данных и для целевого местоположения хранилища , если это указано.

Подробности об использовании Python и SQL для написания исходного кода для потоков данных смотрите в справочной документации по языку SQL для конвейеров и справочной документации по языку Python для декларативных конвейеров Lakeflow Spark.

Дополнительные сведения о параметрах и конфигурациях конвейеров см. в разделе "Настройка конвейеров".

Развертывание первого конвейера и активация обновлений

Чтобы обработать данные с помощью SDP, настройте конвейер. После настройки конвейера можно активировать обновление, чтобы вычислить результаты для каждого набора данных в конвейере. Сведения о начале работы с конвейерами см. в руководстве по созданию конвейера ETL с помощью отслеживания измененных данных.

Что такое обновление конвейера?

Конвейеры развертывают инфраструктуру и пересчитывают состояние данных, когда вы запускаете обновление . Обновление выполняет следующее:

  • Запускает кластер с правильной конфигурацией.
  • Обнаруживает все таблицы и представления, определенные и проверяет наличие ошибок анализа, таких как недопустимые имена столбцов, отсутствующие зависимости и синтаксические ошибки.
  • Создает или обновляет таблицы и представления с самыми последними доступными данными.

Конвейерные процессы могут запускаться непрерывно или по расписанию в зависимости от требований к затратам и задержкам вашей задачи. См. статью "Запуск обновления конвейера".

Прием данных с помощью конвейеров

Конвейеры поддерживают все источники данных, доступные в Azure Databricks.

Databricks рекомендует использовать потоковые таблицы для большинства сценариев загрузки данных. Для файлов, поступающих в облачное хранилище объектов, Databricks рекомендует автозагрузчик. Вы можете напрямую получать данные с помощью канала из большинства шин сообщений.

Дополнительные сведения о настройке доступа к облачному хранилищу см. в разделе "Конфигурация облачного хранилища".

Для форматов, не поддерживаемых автозагрузчиком, можно использовать Python или SQL для запроса любого формата, поддерживаемого Apache Spark. См. сведения о загрузке данных в конвейерах.

Мониторинг и применение качества данных

Вы можете использовать ожидания для указания элементов управления качеством данных для содержимого набора данных. В отличие от ограничения CHECK в традиционной базе данных, которая предотвращает добавление записей, которые не выполняют ограничение, ожидания обеспечивают гибкость при обработке данных, которые не отвечают требованиям к качеству данных. Эта способность позволяет вам обрабатывать и хранить данные, которые могут быть неупорядоченными, и данные, которые должны соответствовать строгим требованиям к качеству. См. Управление качеством данных, используя ожидания конвейера.

SDP расширяет функциональные возможности Delta Lake. Так как таблицы, созданные и управляемые пайплайнами, являются таблицами Delta, они имеют одинаковые гарантии и функции, предоставляемые Delta Lake. См. статью "Что такое Delta Lake в Azure Databricks?".

Пайплайны добавляют несколько свойств таблицы вдобавок к множеству свойств, которые могут быть заданы в Delta Lake. См. справочник по свойствам конвейера и справочник по свойствам таблицы.

Создание и управление таблицами с помощью конвейеров данных

Azure Databricks автоматически управляет таблицами, созданными конвейерами, определяя способ обработки обновлений для правильного вычисления текущего состояния таблицы и выполнения ряда задач обслуживания и оптимизации.

Для большинства операций следует разрешить конвейеру обрабатывать все обновления, вставки и удаления в целевую таблицу. Для получения подробностей и ограничений см. Сохранение вручную произведённых удалений или обновлений.

Задачи обслуживания, выполняемые конвейерами

Azure Databricks выполняет задачи обслуживания с оптимальной частотой в таблицах, управляемых конвейерами, с помощью прогнозной оптимизации. Обслуживание может повысить производительность запросов и сократить затраты, удалив старые версии таблиц. К ним относятся полные операции OPTIMIZE и VACUUM. Задачи обслуживания выполняются по расписанию, определяемом прогнозной оптимизацией, и только если обновление конвейера выполняется с момента предыдущего обслуживания.

Сведения о том, как часто выполняются прогнозные оптимизации, а также понять затраты на обслуживание, см. в справочнике по системной таблице прогнозной оптимизации.

Ограничения

Список ограничений см. в разделе "Ограничения конвейера".

Список требований и ограничений, относящихся к использованию конвейеров с каталогом Unity, см. в разделе "Использование каталога Unity с конвейерами"

Дополнительные ресурсы