Использование каталога Unity с конвейерами

Databricks рекомендует настраивать декларативные конвейеры Lakeflow Spark с помощью Unity Catalog. Использование каталога Unity используется по умолчанию для только что созданных конвейеров.

Конвейеры, настроенные с использованием Unity Catalog, публикуют все заданные материализованные представления и потоковые таблицы в указанный каталог и схему. Конвейеры каталога Unity могут читать данные из других таблиц и томов каталога Unity.

Для управления разрешениями для таблиц, созданных конвейером каталога Unity, используйте GRANT и REVOKE.

Замечание

В этой статье рассматриваются функции текущего режима публикации по умолчанию для конвейеров. Пайплайны, созданные до 5 февраля 2025 г., могут использовать устаревший режим публикации и виртуальную схему LIVE. См. LIVE схему (устаревшую версию).

Требования

Чтобы создать таблицы потоковой передачи и материализованные представления в целевой схеме в каталоге Unity, необходимо иметь следующие разрешения на схему и родительский каталог:

  • Привилегии USE CATALOG для целевого каталога.
  • привилегии CREATE MATERIALIZED VIEW и USE SCHEMA для целевой схемы, если поток данных создает материализованные представления.
  • CREATE TABLE и USE SCHEMA привилегии на целевую схему, если ваш конвейер создает потоковые таблицы .
  • Если ваш конвейер создает новые схемы, у вас должны быть привилегии USE CATALOG и CREATE SCHEMA на целевой каталог.

Требования к вычислениям для запуска конвейера с поддержкой каталога Unity:

  • Вычислительный ресурс необходимо настроить с помощью стандартного режима доступа. Выделенные вычислительные ресурсы не поддерживаются. См. режимы доступа.

Вычисления, необходимые для запроса таблиц, созданных конвейерами с помощью каталога Unity (включая потоковые таблицы и материализованные представления), включают любое из следующих элементов:

  • Хранилища SQL
  • Стандартный режим доступа вычисления на Databricks Runtime версии 13.3 LTS или выше.
  • Режим выделенного доступа к вычислениям, если управление доступом с точной настройкой включено на выделенных вычислениях (то есть они работают на Databricks Runtime 15.4 или выше, и безсерверные вычисления включены для рабочего пространства). Дополнительные сведения см. в разделе "Детализированное управление доступом на выделенных вычислительных ресурсах".
  • Режим выделенного доступа вычислений с версиями 13.3 LTS по 15.3, только если запрос выполняет владелец таблицы.

Действуют дополнительные ограничения вычислительных возможностей. См. следующий раздел.

Ограничения

Ниже приведены ограничения при использовании каталога Unity с конвейерами:

  • Невозможно создать конвейер с поддержкой каталога Unity в рабочей области, подключенной к хранилищу метаданных, созданному в общедоступной предварительной версии каталога Unity. См. Обновление до наследования привилегий.
  • Файлы формата JAR не поддерживаются. Поддерживаются только сторонние библиотеки Python. См. Управление зависимостями Python для конвейеров.
  • Запросы языка обработки данных (DML), изменяющие схему потоковой таблицы, не поддерживаются.
  • Материализованное представление, созданное в конвейере, нельзя использовать как источник потока за пределами этого конвейера, например, в другом конвейере или в записной книжке ниже по потоку.
  • Данные для материализованных представлений и потоковых таблиц хранятся в местоположении хранения для содержащей схемы. Если расположение хранилища схемы не указано, таблицы хранятся в расположении хранилища каталога. Если расположения хранилища схемы и каталога не указаны, таблицы хранятся в корневом расположении хранилища метаданных.
  • На вкладке "Обозреватель Каталога" история не отображается для материализованных представлений.
  • Свойство LOCATION не поддерживается при определении таблицы.
  • Конвейеры обработки данных с поддержкой каталога Unity не могут публиковаться в хранилище метаданных Hive.
  • Глобальные скрипты инициализации не поддерживаются. Вместо этого используйте скрипты инициализации на уровне кластера.

Замечание

Базовые файлы, поддерживающие материализованные представления, могут включать данные из вышестоящих таблиц (включая возможные личные сведения), которые не отображаются в определении материализованного представления. Эти данные автоматически добавляются в базовое хранилище для поддержки добавочного обновления материализованных представлений.

Поскольку базовые файлы материализованного представления могут рисковать раскрытием данных из исходных таблиц, не входящих в схему материализованного представления, Databricks рекомендует не предоставлять общий доступ к базовому хранилищу ненадежным нижестоящим потребителям.

Например, предположим, что определение материала представления включает условие COUNT(DISTINCT field_a). Хотя определение материализованного представления включает только агрегированное предложение COUNT DISTINCT, базовые файлы будут содержать список фактических значений field_a.

Можно ли совместно использовать хранилище метаданных Hive и конвейеры каталога Unity?

Рабочая область может содержать конвейеры, использующие каталог Unity и устаревшее хранилище метаданных Hive. Однако один конвейер не может записывать данные в хранилище метаданных Hive и каталог Unity. Существующие конвейеры, которые записывают данные в хранилище метаданных Hive, нельзя обновить для использования каталога Unity. Чтобы перенести существующий конвейер, записывающий данные в метастор Hive, необходимо создать новый конвейер и повторно загрузить данные из исходного источника(ов). См. статью Создание конвейера каталога Unity путем клонирования конвейера хранилища метаданных Hive.

Существующие конвейеры, не использующие каталог Unity, не влияют на создание новых конвейеров, настроенных с помощью каталога Unity. Эти конвейеры продолжают сохранять данные в метахранилище Hive, используя настроенное место хранения.

Если в этом документе не указано иное, все существующие источники данных и функциональные возможности конвейера поддерживаются с конвейерами, используюющими каталог Unity. Интерфейсы Python и SQL поддерживаются в конвейерах, использующих каталог Unity.

Неактивные таблицы

Если конвейер настроен для сохранения данных в каталоге Unity, конвейер управляет жизненным циклом и разрешениями таблицы.

Таблицы могут стать неактивными, если их определение удалено из конвейера. Следующее обновление потока обработки помечает соответствующее материализованное представление или запись потоковой таблицы в качестве неактивной.

Если изменить каталог или схему конвейера по умолчанию и не использовать полные имена таблиц в исходном коде конвейера, следующий запуск конвейера создает материализованное представление или таблицу потоковой передачи в новом каталоге или схеме, а предыдущая материализованная таблица или таблица потоковой передачи в старом расположении помечена как неактивная.

Вы по-прежнему можете запрашивать неактивные таблицы, но конвейер больше не обновляет их. Для очистки материализованных представлений или потоковых таблиц выполните явно DROP таблицы. Неактивные таблицы будут удалены при удалении конвейера.

  • Вы можете восстановить удаленные таблицы в течение 7 дней с помощью UNDROP команды.
  • Чтобы сохранить прежнее поведение, при котором материализованное представление или запись из потоковой таблицы удаляется из Unity Catalog при следующем обновлении конвейера, задайте конфигурацию конвейера "pipelines.dropInactiveTables": "true". Фактические данные сохраняются в течение некоторого времени, чтобы их можно было восстановить, если они были удалены по ошибке. Данные могут быть восстановлены в течение 7 дней путем добавления материализованного представления или потоковой таблицы обратно в определение конвейера.

Удаление конвейера полностью (в отличие от удаления определения таблицы из источника конвейера) также удаляет все таблицы, определенные в этом конвейере. Пользовательский интерфейс запрашивает подтверждение удаления конвейера.

Удаление конвейера

При удалении конвейера каталога Unity связанные материализованные представления, потоковые таблицы и представления также удаляются.

Чтобы удалить конвейер и сохранить свои таблицы, используйте cascade поле в API. Сохраненные таблицы неактивны, но могут запрашиваться. Вы можете переместить неактивные таблицы в новый конвейер и, если они подключены к потоку, они повторно активируются. См. раздел "Перемещение таблиц между конвейерами".

DELETE /api/2.0/pipelines/{pipeline_id}?cascade=false

См. статью "Удаление конвейера" в документации по REST API Databricks.

Запись таблиц в каталог Unity из конвейера

Чтобы загрузить свои таблицы в каталог Unity, необходимо настроить пайплайн для взаимодействия с ним через рабочую область. Когда вы создаёте конвейер, выберите Unity Catalog в разделе «Параметры хранения», выберите каталог в выпадающем меню «Каталог» и выберите существующую схему или введите имя для новой схемы в выпадающем меню «Целевая схема». Для получения информации о каталогах Unity Catalog см. Что такое каталоги в Azure Databricks?. Чтобы узнать о схемах в Unity Catalog, см. раздел Что такое схемы в Azure Databricks?.

Загрузка данных в конвейер Unity Catalog

Конвейер, настроенный для использования каталога Unity, может считывать данные из:

  • Управляемые и внешние таблицы, представления, материальные представления и потоковые таблицы в Unity Catalog.
  • Таблицы и представления Hive metastore.
  • Автозагрузчик с помощью функции read_files() для чтения из внешних местоположений каталога Unity.
  • Apache Kafka и Amazon Kinesis.

Ниже приведены примеры чтения из каталога Unity и таблиц хранилища метаданных Hive.

Пакетная загрузка данных из таблицы каталога Unity

SQL

CREATE OR REFRESH MATERIALIZED VIEW
  table_name
AS SELECT
  *
FROM
  my_catalog.my_schema.table1;

Python

@dp.materialized_view
def table_name():
  return spark.read.table("my_catalog.my_schema.table")

Поток изменений из таблицы каталога Unity

SQL

CREATE OR REFRESH STREAMING TABLE
  table_name
AS SELECT
  *
FROM
  STREAM(my_catalog.my_schema.table1);

Python

@dp.table
def table_name():
  return spark.readStream.table("my_catalog.my_schema.table")

Загрузить данные из метастора Hive

Конвейер, использующий каталог Unity, может считывать данные из таблиц хранилища метаданных Hive с помощью каталога hive_metastore:

SQL

CREATE OR REFRESH MATERIALIZED VIEW
  table_name
AS SELECT
  *
FROM
  hive_metastore.some_schema.table;

Python

@dp.materialized_view
def table3():
  return spark.read.table("hive_metastore.some_schema.table")

Организовать загрузку данных с Auto Loader

SQL

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
  "/path/to/uc/external/location",
  format => "json"
)

Python

@dp.table(table_properties={"quality": "bronze"})
def table_name():
  return (
     spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .load(f"{path_to_uc_external_location}")
 )

Предоставление доступа к материализованным представлениям

По умолчанию только владелец конвейера имеет разрешение на запрос наборов данных, созданных этим конвейером. Вы можете предоставить другим пользователям возможность запрашивать таблицу с помощью инструкций GRANT и отменить доступ к запросу с помощью инструкций REVOKE. Дополнительные сведения о привилегиях в каталоге Unity см. в разделе Управление привилегиями в каталоге Unity.

Предоставить право на выборку из таблицы

GRANT SELECT ON TABLE
  my_catalog.my_schema.table_name
TO
  `user@databricks.com`

Отменить разрешение SELECT с таблицы

REVOKE SELECT ON TABLE
  my_catalog.my_schema.table_name
FROM
  `user@databricks.com`

Предоставить привилегии на создание таблиц или создание материализованного представления

GRANT CREATE { MATERIALIZED VIEW | TABLE } ON SCHEMA
  my_catalog.my_schema
TO
  { principal | user }

Просмотр родословной конвейера

Происхождение таблиц, определенных в конвейерах, видно в средстве просмотра каталогов. В пользовательском интерфейсе обозревателя каталогов отображаются входящие и исходящие таблицы для материализованных представлений или потоковых таблиц в конвейере с поддержкой Unity Catalog. Дополнительные сведения о происхождении каталога Unity см. в разделе " Происхождение данных" в каталоге Unity.

Для материализованного представления или потоковой передачи таблицы в конвейере с поддержкой каталога Unity пользовательский интерфейс происхождения обозревателя каталогов также будет связываться с конвейером, создающим материализованное представление или потоковую таблицу, если конвейер доступен из текущей рабочей области.

добавление, изменение или удаление данных в потоковой таблице

Инструкции языка обработки данных (DML), включая инструкции insert, update, delete и merge, можно использовать для изменения таблиц потоковой передачи, опубликованных в каталоге Unity. Поддержка запросов DML к таблицам потоковой передачи позволяет использовать такие варианты, как обновление таблиц для соответствия общему регламенту по защите данных (GDPR).

Замечание

  • Инструкции DML, изменяющие схему таблицы потоковой передачи, не поддерживаются. Убедитесь, что операторы DML не пытаются изменять схему таблицы.
  • Инструкции DML, обновляющие потоковую таблицу, могут выполняться только в общем кластере каталога Unity или хранилище SQL с помощью Databricks Runtime 13.3 LTS и более поздних версий.
  • Поскольку для потоковой передачи требуются источники данных с возможностью только добавления, если ваша обработка требует потоковой передачи из исходной потоковой таблицы с изменениями (например, с использованием операторов DML), задайте флаг skipChangeCommits при чтении потоковой таблицы-источника. При установке skipChangeCommits транзакции, которые удаляют или изменяют записи в исходной таблице, игнорируются. Если для обработки не требуется потоковая таблица, можно использовать материализованное представление (которое не имеет ограничения только для добавления) в качестве целевой таблицы.

Ниже приведены примеры инструкций DML для изменения записей в таблице потоковой передачи.

Удалите записи с определённым идентификатором:

DELETE FROM my_streaming_table WHERE id = 123;

Обновите записи с определенным идентификатором:

UPDATE my_streaming_table SET name = 'Jane Doe' WHERE id = 123;

Публикуйте таблицы с фильтрами строк и масками столбцов

фильтры строк позволяют указать функцию, которая применяется в качестве фильтра при каждом сканировании таблиц. Эти фильтры гарантируют, что последующие запросы возвращают только строки, для которых предикат фильтра оценивается как true.

Маски столбцов позволяют маскировать значения столбца всякий раз, когда таблица сканирует строки. Будущие запросы для этого столбца возвращают результат вычисляемой функции вместо исходного значения столбца. Дополнительные сведения об использовании фильтров строк и маск столбцов см. в статьях "Фильтры строк" и "Маски столбцов".

Управление фильтрами строк и масками столбцов

Фильтры строк и маски столбцов для материализованных представлений и потоковых таблиц должны быть добавлены, обновлены или удалены с помощью инструкции CREATE OR REFRESH.

Подробный синтаксис определения таблиц с фильтрами строк и масками столбцов см. в справочнике по языку SQL и справочнике по языку Python для Lakeflow Spark Декларативных конвейеров .

Поведение

Ниже приведены важные сведения при использовании фильтров строк или масок столбцов в конвейере:

  • Обновление от имени владельца: когда обновление конвейера обновляет материализованное представление или потоковую таблицу, функции фильтра строк и маски столбцов выполняются с правами владельца конвейера. Это означает, что обновление таблицы использует контекст безопасности пользователя, создавшего конвейер. Функции, которые проверяют контекст пользователя (например CURRENT_USER , и IS_MEMBER) оцениваются с помощью контекста пользователя владельца конвейера.
  • Запрос. При запросе материализованного представления или потоковой таблицы функции, проверяющие контекст пользователя (например CURRENT_USER , и IS_MEMBER) оцениваются с помощью контекста пользователя вызывающего объекта. Этот подход применяет элементы управления безопасностью и доступом для определенных пользователей на основе контекста текущего пользователя.
  • При создании материализованных представлений по исходным таблицам, содержащим фильтры строк и маски столбцов, обновление материализованного представления всегда является полным обновлением. Полное обновление повторно обрабатывает все данные, доступные в источнике с помощью последних определений. Этот процесс проверяет, что политики безопасности на исходных таблицах оцениваются и применяются с использованием самых актуальных данных и определений.

Observability

Используйте DESCRIBE EXTENDED, INFORMATION_SCHEMAили обозреватель каталога, чтобы проверить существующие фильтры строк и маски столбцов, которые применяются к заданному материализованному представлению или потоковой таблице. Эта функция позволяет пользователям проводить аудит и проверку мер доступа к данным и защиты для материализованных представлений и потоковых таблиц.