Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Разработка и тестирование кода конвейера отличается от других рабочих нагрузок Apache Spark. В этой статье представлен обзор поддерживаемой функциональности, лучших практик и соображений при разработке кода конвейера. Чтобы ознакомиться с дополнительными рекомендациями и лучшими практиками, см. раздел "Применение лучших методов разработки программного обеспечения и DevOps к конвейерам".
Замечание
Чтобы проверить код или запустить обновление, необходимо добавить исходный код в конфигурацию конвейера. См. раздел "Настройка конвейеров".
Какие файлы допустимы для исходного кода конвейера?
Код конвейера может быть Python или SQL. Вы можете использовать сочетание файлов исходного кода Python и SQL, поддерживаемых одним конвейером, но каждый файл может содержать только один язык. См. статью "Разработка кода конвейера с помощью Python и разработка декларативного конвейера Spark Lakeflow с помощью SQL".
Исходные файлы для конвейеров хранятся в рабочей области. Файлы рабочей области представляют скрипты Python или SQL, созданные в редакторе Конвейеров Lakeflow. Вы также можете изменить файлы локально в предпочтительной интегрированной среде разработки и синхронизировать их с рабочей областью. Сведения о файлах в рабочей области см. в разделе "Что такое файлы рабочей области?". Сведения о редактировании с помощью редактора Конвейеров Lakeflow см. в разделе "Разработка и отладка конвейеров ETL" с помощью редактора конвейеров Lakeflow. Сведения о создании кода в локальной интегрированной среде разработки см. в статье "Разработка кода конвейера в вашей локальной среде разработки".
При разработке кода Python в качестве модулей или библиотек необходимо установить и импортировать код, а затем вызвать методы из файла Python, настроенного как исходный код. См. статью "Управление зависимостями Python" для конвейеров.
Замечание
Если вам нужно использовать произвольные команды SQL в записной книжке Python, можно использовать шаблон синтаксиса spark.sql("<QUERY>") для запуска SQL в качестве кода Python.
Функции каталога Unity позволяют регистрировать произвольные пользовательские функции Python для использования в SQL. См. функции, определяемые пользователем (UDF), в каталоге Unity.
Общие сведения о функциях разработки конвейеров
Конвейеры расширяют возможности использования множества функций разработки Azure Databricks, а также вводят новые функции и концепции. В следующей таблице представлен краткий обзор концепций и функций, поддерживающих разработку кода конвейера:
| Функция | Description |
|---|---|
| Режим разработки | Интерактивный запуск конвейеров (выбирая обновление через редактор конвейеров Lakeflow) будет использовать режим разработки. Новые конвейеры запускаются с отключенным режимом разработки при автоматическом запуске с помощью расписания или автоматического триггера. См. режим разработки. |
| Сухой запуск | Обновление сухого запуска проверяет правильность исходного кода конвейера без выполнения обновления в каких-либо таблицах. См. Проверка трубопровода на ошибки без ожидания обновления таблиц. |
| Редактор конвейеров Lakeflow | Файлы Python и SQL, настроенные в качестве исходного кода для конвейеров, предоставляют интерактивные параметры проверки кода и выполнения обновлений. См. статью "Разработка и отладка конвейеров ETL" с помощью редактора конвейеров Lakeflow. |
| Параметры | Используйте параметры в конфигурациях исходного кода и конвейера, чтобы упростить тестирование и расширяемость. См. раздел "Использование параметров с конвейерами". |
| Пакеты активов Databricks | Пакеты ресурсов Databricks позволяют перемещать конфигурации конвейера и исходный код между рабочими областями. См. статью "Преобразование конвейера в проект пакета активов Databricks". |
Создание примеров наборов данных для разработки и тестирования
Databricks рекомендует создавать наборы данных для разработки и тестирования, чтобы проверять логику конвейера с ожидаемыми данными и потенциально некорректными или поврежденными записями. Существует несколько способов создания наборов данных, которые могут быть полезны для разработки и тестирования, включая следующие:
- Выберите подмножество данных из рабочего набора данных.
- Используйте анонимные или искусственные созданные данные для источников, содержащих личные данные. Сведения о руководстве по созданию данных для тестирования с помощью
fakerбиблиотеки см. в руководстве по созданию конвейера ETL с помощью отслеживания измененных данных. - Создайте тестовые данные с четко определенными результатами, основываясь на логике последующих трансформаций.
- Предвидите потенциальные повреждения данных, неправильные записи и изменения вышестоящих данных путем создания записей, которые нарушают ожидания схемы данных.
Например, если у вас есть файл, определяющий набор данных с помощью следующего кода:
CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
"/production/data",
format => "json")
Вы можете создать пример набора данных, содержащего подмножество записей с помощью запроса, как показано ниже.
CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading
В следующем примере показано фильтрация опубликованных данных для создания подмножества рабочих данных для разработки или тестирования:
CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY
Чтобы использовать эти различные наборы данных, создайте несколько конвейеров с исходным кодом, реализующим логику преобразования. Каждый конвейер может считывать данные из input_data набора данных, но настроен так, чтобы включать файл, который создаёт набор данных, относящийся к среде.
Как конвейерные наборы данных обрабатывают данные?
В следующей таблице описано, как материализованные представления, потоковые таблицы и представления обрабатывают данные.
| Тип набора данных | Как записи обрабатываются с помощью определенных запросов? |
|---|---|
| Потоковая таблица | Каждая запись обрабатывается ровно один раз. Предполагается, что это источник, в который можно только добавлять данные. |
| Материализованное представление | Записи обрабатываются по мере необходимости, чтобы получить точные результаты для текущего состояния данных. Материализованные представления должны использоваться для задач обработки данных, таких как преобразования, агрегаты или предварительные вычисления медленных запросов и часто используемые вычисления. Результаты кэшируются между обновлениями. |
| View | При каждом запросе представления записи обрабатываются. Используйте представления для промежуточных преобразований и проверок качества данных, которые не должны публиковаться в общедоступных наборах данных. |
Объявите ваши первые наборы данных в конвейерах
Пайплайны вводят новый синтаксис для Python и SQL. Сведения об основах синтаксиса конвейера см. в статье "Разработка кода конвейера с помощью Python и разработка декларативного конвейера Spark Lakeflow Spark с помощью SQL".
Замечание
Конвейерные линии отделяют определения наборов данных от обработки обновлений, а исходный код конвейера не предназначен для интерактивного выполнения.
Как настроить конвейеры?
Параметры конвейера делятся на две широкие категории:
- Конфигурации, определяющие коллекцию файлов (известных как исходный код), которые используют синтаксис пайплайна для объявления наборов данных.
- Конфигурации, управляющие инфраструктурой конвейера, управлением зависимостями, обработкой обновлений и сохранением таблиц в рабочей области.
Большинство конфигураций являются необязательными, но некоторые требуют внимательного внимания, особенно при настройке рабочих конвейеров. К ним относятся следующие:
- Чтобы сделать данные доступными за пределами конвейера, необходимо объявить целевую схему для публикации в хранилище метаданных Hive или целевой каталог и целевую схему для публикации в каталоге Unity.
- Разрешения доступа к данным настраиваются через кластер, используемый для выполнения. Убедитесь, что кластер имеет соответствующие разрешения, настроенные для источников данных и для целевого местоположения хранилища , если это указано.
Подробности об использовании Python и SQL для написания исходного кода для потоков данных смотрите в справочной документации по языку SQL для конвейеров и справочной документации по языку Python для декларативных конвейеров Lakeflow Spark.
Дополнительные сведения о параметрах и конфигурациях конвейеров см. в разделе "Настройка конвейеров".
Развертывание первого конвейера и активация обновлений
Чтобы обработать данные с помощью SDP, настройте конвейер. После настройки конвейера можно активировать обновление, чтобы вычислить результаты для каждого набора данных в конвейере. Сведения о начале работы с конвейерами см. в руководстве по созданию конвейера ETL с помощью отслеживания измененных данных.
Что такое обновление конвейера?
Конвейеры развертывают инфраструктуру и пересчитывают состояние данных, когда вы запускаете обновление . Обновление выполняет следующее:
- Запускает кластер с правильной конфигурацией.
- Обнаруживает все таблицы и представления, определенные и проверяет наличие ошибок анализа, таких как недопустимые имена столбцов, отсутствующие зависимости и синтаксические ошибки.
- Создает или обновляет таблицы и представления с самыми последними доступными данными.
Конвейерные процессы могут запускаться непрерывно или по расписанию в зависимости от требований к затратам и задержкам вашей задачи. См. статью "Запуск обновления конвейера".
Прием данных с помощью конвейеров
Конвейеры поддерживают все источники данных, доступные в Azure Databricks.
Databricks рекомендует использовать потоковые таблицы для большинства сценариев загрузки данных. Для файлов, поступающих в облачное хранилище объектов, Databricks рекомендует автозагрузчик. Вы можете напрямую получать данные с помощью канала из большинства шин сообщений.
Дополнительные сведения о настройке доступа к облачному хранилищу см. в разделе "Конфигурация облачного хранилища".
Для форматов, не поддерживаемых автозагрузчиком, можно использовать Python или SQL для запроса любого формата, поддерживаемого Apache Spark. См. сведения о загрузке данных в конвейерах.
Мониторинг и применение качества данных
Вы можете использовать ожидания для указания элементов управления качеством данных для содержимого набора данных. В отличие от ограничения CHECK в традиционной базе данных, которая предотвращает добавление записей, которые не выполняют ограничение, ожидания обеспечивают гибкость при обработке данных, которые не отвечают требованиям к качеству данных. Эта способность позволяет вам обрабатывать и хранить данные, которые могут быть неупорядоченными, и данные, которые должны соответствовать строгим требованиям к качеству. См. Управление качеством данных, используя ожидания конвейера.
Как связаны декларативные конвейеры Spark Lakeflow и Delta Lake?
SDP расширяет функциональные возможности Delta Lake. Так как таблицы, созданные и управляемые пайплайнами, являются таблицами Delta, они имеют одинаковые гарантии и функции, предоставляемые Delta Lake. См. статью "Что такое Delta Lake в Azure Databricks?".
Пайплайны добавляют несколько свойств таблицы вдобавок к множеству свойств, которые могут быть заданы в Delta Lake. См. справочник по свойствам конвейера и справочник по свойствам таблицы.
Создание и управление таблицами с помощью конвейеров данных
Azure Databricks автоматически управляет таблицами, созданными конвейерами, определяя способ обработки обновлений для правильного вычисления текущего состояния таблицы и выполнения ряда задач обслуживания и оптимизации.
Для большинства операций следует разрешить конвейеру обрабатывать все обновления, вставки и удаления в целевую таблицу. Для получения подробностей и ограничений см. Сохранение вручную произведённых удалений или обновлений.
Задачи обслуживания, выполняемые конвейерами
Azure Databricks выполняет задачи обслуживания с оптимальной частотой в таблицах, управляемых конвейерами, с помощью прогнозной оптимизации. Обслуживание может повысить производительность запросов и сократить затраты, удалив старые версии таблиц. К ним относятся полные операции OPTIMIZE и VACUUM. Задачи обслуживания выполняются по расписанию, определяемом прогнозной оптимизацией, и только если обновление конвейера выполняется с момента предыдущего обслуживания.
Сведения о том, как часто выполняются прогнозные оптимизации, а также понять затраты на обслуживание, см. в справочнике по системной таблице прогнозной оптимизации.
Ограничения
Список ограничений см. в разделе "Ограничения конвейера".
Список требований и ограничений, относящихся к использованию конвейеров с каталогом Unity, см. в разделе "Использование каталога Unity с конвейерами"
Дополнительные ресурсы
- В REST API Databricks имеется полная поддержка конвейеров. См. REST API конвейеров данных.
- Сведения о параметрах конвейера и таблицы см. в справочнике по свойствам конвейера.
- Справочник по языку Pipeline SQL.
- Справочник по языку Python для Декларативного конвейера Spark Lakeflow.