Поделиться через


Синхронизация данных из таблиц каталога Unity с экземпляром базы данных

Это важно

Эта функция доступна в общедоступной предварительной версии в следующих регионах: westus, westus2eastuseastus2northeuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindiacentralussouthcentralus, . southeastasiauksouth

На этой странице описывается создание синхронизированной таблицы и управление ими. Синхронизированная таблица — это таблица Postgres только для чтения каталога Unity, которая автоматически синхронизирует данные из таблицы каталога Unity с экземпляром базы данных Lakebase. Синхронизация таблицы каталога Unity с Postgres обеспечивает низкую задержку чтения запросов и поддерживает соединения во время запроса с другими таблицами Postgres.

Декларативный конвейер управляемой системы Lakeflow обеспечивает синхронизацию, постоянно обновляя таблицу Postgres с изменениями из исходной таблицы. После создания синхронизированные таблицы можно запрашивать непосредственно с помощью средств Postgres.

Ниже приведены ключевые характеристики синхронизированных таблиц:

  • Режим только для чтения в Postgres для сохранения целостности данных исходного источника.
  • Автоматическая синхронизация с помощью управляемых декларативных конвейеров Lakeflow
  • Доступно для выполнения запросов через стандартные интерфейсы PostgreSQL
  • Управляется через Unity Catalog для управления и контроля жизненного цикла

Перед тем как начать

  • У вас есть таблица каталога Unity в любом каталоге.
  • У вас есть CAN USE права доступа к экземпляру базы данных.

Создание синхронизированной таблицы

Пользовательский интерфейс

Чтобы синхронизировать таблицу каталога Unity с Postgres, сделайте следующее:

  1. Щелкните каталог на боковой панели рабочей области.

  2. Найдите и выберите таблицу каталога Unity, в которой вы хотите создать синхронизированную таблицу.

  3. Нажмите кнопку "Создать>синхронизированную таблицу".

  4. Выберите каталог, схему и введите имя таблицы для новой синхронизированной таблицы.

    • Синхронизированные таблицы также можно создавать в стандартных каталогах с некоторой дополнительной конфигурацией. Выберите каталог "Стандартный", схему и введите имя таблицы для только что созданной синхронизированной таблицы.
  5. Выберите экземпляр базы данных и введите имя базы данных Postgres, в которой создается синхронизированная таблица. Поле базы данных Postgres по умолчанию используется для выбранного целевого каталога. Если база данных Postgres не существует под этим именем, Azure Databricks создает новую.

  6. Выберите первичный ключ. Первичный ключ необходим , так как он обеспечивает эффективный доступ к строкам для операций чтения, обновлений и удаления.

  7. Если две строки имеют один и тот же первичный ключ в исходной таблице, выберите Ключ временного ряда, чтобы настроить дедупликацию. Если указан ключ Timeseries , синхронизированные таблицы содержат только строки с последним значением ключа таймерии для каждого первичного ключа.

  8. Выберите режим синхронизации из Снимок, По триггеру и Непрерывный. Для всех режимов синхронизации вся исходная таблица считывается и записывается в Postgres.

    Политика Описание
    Снимок Конвейер данных запускается один раз, чтобы создать моментальный снимок исходной таблицы и скопировать его в синхронизированную таблицу. Последующие изменения исходной таблицы автоматически отражаются в синхронизированной таблице путем создания нового моментального снимка источника и создания новой копии. Содержимое синхронизированной таблицы обновляется атомарно.
    Активировано Конвейер запускается один раз, чтобы создать начальную копию моментального снимка исходной таблицы в синхронизированной таблице. В отличие от режима синхронизации моментальных снимков, при обновлении синхронизированной таблицы только изменения с момента последнего выполнения конвейера извлекаются и применяются к синхронизированной таблице. Добавочное обновление можно активировать вручную или автоматически активировать в соответствии с расписанием.
    Непрерывный Трубопровод работает непрерывно. Последующие изменения исходной таблицы постепенно применяются к синхронизированной таблице в режиме потоковой передачи в реальном времени. Обновление вручную не требуется.

    Замечание

    Для поддержки режима триггерной или непрерывной синхронизации исходная таблица должна включать канал изменений данных.

  9. Выберите, нужно ли создать эту синхронизированную таблицу из нового или существующего конвейера.

    • При создании конвейера и использовании управляемого каталога необходимо выбрать расположение хранилища для промежуточной таблицы. При использовании стандартного каталога промежуточная таблица автоматически хранится в каталоге.
    • Если используется существующий конвейер, убедитесь, что новый режим синхронизации соответствует режиму конвейера.
  10. После того как состояние синхронизированной таблицы находится в Сети, войдите в экземпляр базы данных и запросите только что созданную таблицу. Запросите таблицу с помощью редактора SQL, внешних средств или записных книжек.

завиток

Создайте синхронизированную таблицу в каталоге баз данных.

export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"

curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
  "name": "$DEST_TBL",
  "spec": {
    "source_table_full_name": "$SRC_TBL",
    "primary_key_columns": $PKS,
    "scheduling_policy": "TRIGGERED",
  },
  "new_pipeline_spec": {
    "storage_catalog": "$ST_CATALOG",
    "storage_schema": "$ST_SCHEMA",
  }
}
EOF

Создайте синхронизированную таблицу в стандартном каталоге каталога Unity.

export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"

curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
  "name": "$DEST_TBL",
  "database_instance_name": "$DATABASE_INSTANCE",
  "logical_database_name": "$POSTGRES_DATABASE",
  "spec": {
    "source_table_full_name": "$SRC_TBL",
    "primary_key_columns": $PKS,
    "scheduling_policy": "TRIGGERED",
  },
  "new_pipeline_spec": {
    "storage_catalog": "$ST_CATALOG",
    "storage_schema": "$ST_SCHEMA",
  }
}
EOF

Проверьте состояние синхронизированной таблицы.

export SYNCEDTABLE='pg_db.silver.sbtest1_online'

curl --request GET \
  "https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
  --header "Authorization: Bearer dapi..."

Поддерживаемые операции

Только ограниченный набор операций поддерживается на стороне Postgres для синхронизированных таблиц:

  • Запросы только для чтения
  • Создание индексов
  • Удаление таблицы (чтобы освободить место после удаления синхронизированной таблицы из каталога Unity)

Хотя вы можете изменить эту таблицу другими способами, она вмешивается в конвейер синхронизации.

Удаление синхронизированной таблицы

Чтобы удалить синхронизированную таблицу, необходимо удалить ее из каталога Unity, а затем удалить таблицу в экземпляре базы данных. Удаление синхронизированной таблицы из каталога Unity отменяет регистрацию таблицы и останавливает обновления данных. Однако таблица остается в базовой базе данных Postgres. Чтобы освободить место в экземпляре базы данных, подключитесь к экземпляру и используйте команду DROP TABLE.

  1. Щелкните каталог на боковой панели рабочей области.
  2. Найдите и выберите синхронизированную таблицу, которую вы хотите удалить.
  3. Щелкните значок меню Kebab.>Удаление.
  4. Подключитесь к экземпляру с psql, с помощью редактора SQL или записной книжки.
  5. Удалите таблицу с помощью PostgreSQL.
    DROP TABLE synced_table_database.synced_table_schema.synced_table
    

Владение и разрешения

Если вы создаете новую базу данных Postgres, схему или таблицу, свойство Postgres устанавливается следующим образом:

  • Владение назначается пользователю, который создает базу данных, схему или таблицу, если его имя входа Azure Databricks существует как роль в Postgres. Чтобы добавить роль удостоверения Azure Databricks в Postgres, см. статью «Создание и управление ролями Postgres для удостоверений Azure Databricks».
  • В противном случае владение назначается владельцу родительского объекта в Postgres (обычно databricks_superuser).

Управление доступом к синхронизированной таблице

После создания databricks_superuser синхронизированной таблицы можно READ синхронизировать таблицу из Postgres. databricks_superuser имеет pg_read_all_data и pg_write_all_data привилегии:

  • Кроме того, databricks_superuser эти привилегии можно предоставить другим пользователям:

    GRANT USAGE ON SCHEMA synced_table_schema TO user;
    
    GRANT SELECT ON synced_table_name TO user;
    
  • databricks_superuser может отозвать эти привилегии:

    REVOKE USAGE ON SCHEMA synced_table_schema FROM user;
    
    REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
    

Управление синхронизированными операциями таблицы

databricks_superuser может управлять тем, какие пользователи уполномочены выполнять конкретные операции в синхронизированной таблице. Поддерживаемые операции для синхронизированных таблиц:

  • CREATE INDEX
  • ALTER INDEX
  • DROP INDEX
  • DROP TABLE

Все остальные операции DDL запрещены для синхронизированных таблиц.

Чтобы предоставить этим привилегиям дополнительным пользователям, databricks_superuser сначала необходимо создать расширение в databricks_auth:

CREATE EXTENSION IF NOT EXISTS databricks_auth;

databricks_superuser Затем пользователь может добавить пользователя для управления синхронизированной таблицей:

SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');

databricks_superuser может удалить пользователя из управления синхронизированной таблицей.

SELECT databricks_synced_table_remove_manager('[table]', '[user]');

databricks_superuser может просматривать всех руководителей:

SELECT * FROM databricks_synced_table_managers;

Обработка недопустимых символов

Некоторые символы, такие как байт null (0x00), допускаются в STRING, ARRAYMAPили STRUCT столбцах в таблицах Delta, но не поддерживаются в Postgres TEXT или JSONB столбцах. В результате синхронизация таких данных из Delta в Postgres может привести к сбоям вставки с ошибками:

org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00

org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
  • Первая ошибка возникает, когда байт null отображается в строковом столбце верхнего уровня, который сопоставляется непосредственно с Postgres TEXT.
  • Вторая ошибка возникает, когда байт null появляется в строке, вложенной в сложный тип (STRUCT, ARRAY или MAP), который Azure Databricks сериализует как JSONB. Во время сериализации все строки привязываются к Postgres TEXT, где \u0000 запрещено.

Способы устранения:

Эту проблему можно устранить одним из следующих способов:

  • Очистка строковых полей

    Удалите или замените неподдерживаемые символы из всех строковых полей, в том числе из сложных типов, перед синхронизацией с Postgres.

    Чтобы удалить null байты из столбца верхнего уровня STRING , используйте функцию REPLACE :

    SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;
    
  • Преобразование в двоичный файл (только для STRING столбцов)

    Если необходимо сохранить необработанное содержимое байтов, преобразуйте затронутые столбцы STRING в BINARY.

Ограничения и рекомендации

Защищаемые типы исходной таблицы

Тип защищаемого объекта исходной таблицы (показан на вкладке Сведения обозревателя каталогов) должен быть одним из поддерживаемых ниже вариантов.

  • TABLE_EXTERNAL
  • TABLE_DELTA
  • TABLE_DELTA_EXTERNAL
  • TABLE_DELTASHARING
  • TABLE_DELTASHARING_MUTABLE
  • TABLE_STREAMING_LIVE_TABLE
  • TABLE_STANDARD
  • TABLE_FEATURE_STORE
  • TABLE_FEATURE_STORE_EXTERNAL
  • TABLE_VIEW
  • TABLE_VIEW_DELTASHARING
  • TABLE_MATERIALIZED_VIEW

Ограничения именования и идентификаторов

  • Допустимые символы: Имена баз данных Postgres, схемы и таблиц для синхронизированных таблиц могут содержать только буквенно-цифровые символы и символы подчеркивания ([A-Za-z0-9_]+). Дефисы (-) и другие специальные символы не поддерживаются.
  • Идентификаторы столбцов и таблиц: Избегайте использования прописных букв или специальных символов в именах столбцов или таблиц в исходной таблице каталога Unity. Если они сохранены, при ссылке на них в Postgres необходимо процитировать эти идентификаторы.

Быстродействие и синхронизация

  • Скорость синхронизации: Синхронизация данных в синхронизированные таблицы может быть медленнее, чем запись данных непосредственно в экземпляр базы данных с собственным клиентом PostgreSQL из-за дополнительной обработки. Режим непрерывной синхронизации обновляет данные из управляемой таблицы каталога Unity в синхронизированную таблицу с минимальным интервалом в 15 секунд.
  • Использование подключения: Каждая синхронизация таблицы может использовать до 16 подключений к экземпляру базы данных, что засчитывается в общее ограничение на подключения для экземпляра.
  • Идемпотентность API: Синхронизированные API таблицы являются идемпотентными и могут потребовать повторного вызова при возникновении ошибок, чтобы обеспечить своевременные операции.
  • Изменения схемы: Для синхронизированных таблиц в Triggered или Continuous режиме, только добавочные изменения схемы (например, добавление нового столбца) из таблиц Unity Catalog отражаются в синхронизированной таблице.
  • Повторяющиеся ключи: Если две строки имеют один и тот же первичный ключ в исходной таблице, конвейер синхронизации завершается ошибкой, если не настроить дедупликацию с помощью ключа Timeseries. Однако использование ключа Timeseries связано с снижением производительности.
  • Частота обновления: Конвейер синхронизации поддерживает непрерывную запись примерно в 1200 строк в секунду на единицу емкости (CU) и массовую запись до 15 000 строк в секунду на единицу емкости (CU).

Емкость и ограничения

  • Ограничения таблицы:
    • Ограничение в 20 синхронизированных таблиц на исходную таблицу.
    • Каждая синхронизация таблиц может использовать до 16 подключений к базе данных.
  • Ограничения размера и полное обновление:
    • Если вы полностью обновите синхронизированную таблицу, старая версия в Postgres не удаляется, пока новая таблица не будет синхронизирована. Обе версии временно засчитываются в счет лимита размера логической базы данных во время обновления.
    • Каждая синхронизированная таблица может составлять до 2 ТБ. Если требуется обновление вместо полного восстановления таблицы, ограничение составляет 1 ТБ.
    • Если несжатый размер таблицы в строковом формате каталога Unity превышает размер экземпляра базы данных (2 ТБ), синхронизация завершится неудачей. Перед записью в инстанс необходимо удалить синхронизированную таблицу в Postgres.

Интеграция с каталогом

  • Дублирование каталога: Создание синхронизированной таблицы в стандартном каталоге, предназначенной для базы данных Postgres, которая также зарегистрирована в качестве отдельного каталога базы данных, приведет к отображению синхронизированной таблицы в каталоге Unity как в стандартном, так и в каталогах баз данных.