Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Это важно
Эта функция доступна в общедоступной предварительной версии в следующих регионах: westus, westus2eastuseastus2centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindia, . southeastasiauksouth
Обратный ETL с подготовленным Lakebase позволяет синхронизировать данные из таблиц каталога Unity в Postgres путем создания синхронизированных таблиц. На этой странице описывается создание синхронизированных таблиц и управление ими в Lakebase Provisioned.
Что такое синхронизированная таблица?
Синхронизированная таблица — это таблица Postgres только для чтения каталога Unity, которая автоматически синхронизирует данные из таблицы каталога Unity с экземпляром базы данных Lakebase. Синхронизация таблицы каталога Unity с Postgres обеспечивает низкую задержку чтения запросов и поддерживает соединения во время запроса с другими таблицами Postgres.
Синхронизация обрабатывается декларативными конвейерами Spark Lakeflow. Управляемый конвейер постоянно обновляет таблицу Postgres с изменениями из исходной таблицы. После создания синхронизированные таблицы можно запрашивать непосредственно с помощью средств Postgres.
Ниже приведены ключевые характеристики синхронизированных таблиц:
- Режим только для чтения в Postgres для сохранения целостности данных исходного источника.
- Автоматически синхронизирован с помощью управляемых Lakeflow Spark декларативных конвейеров
- Доступно для выполнения запросов через стандартные интерфейсы PostgreSQL
- Управляется через Unity Catalog для управления и контроля жизненного цикла
Перед тем как начать
- У вас есть таблица каталога Unity в любом каталоге.
- У вас есть
CAN USEправа доступа к экземпляру базы данных.
Создание синхронизированной таблицы
Пользовательский интерфейс
Чтобы синхронизировать таблицу каталога Unity с Postgres, сделайте следующее:
Щелкните каталог на боковой панели рабочей области.
Найдите и выберите таблицу каталога Unity, в которой вы хотите создать синхронизированную таблицу.
Нажмите кнопку "Создать>синхронизированную таблицу".
Выберите каталог, схему и введите имя таблицы для новой синхронизированной таблицы.
- Синхронизированные таблицы также можно создавать в стандартных каталогах с некоторой дополнительной конфигурацией. Выберите каталог "Стандартный", схему и введите имя таблицы для только что созданной синхронизированной таблицы.
Выберите экземпляр базы данных и введите имя базы данных Postgres, в которой создается синхронизированная таблица. Поле базы данных Postgres по умолчанию используется для выбранного целевого каталога. Если база данных Postgres не существует под этим именем, Azure Databricks создает новую.
Выберите первичный ключ. Первичный ключ необходим , так как он обеспечивает эффективный доступ к строкам для операций чтения, обновлений и удаления.
Это важно
Столбцы в первичном ключе не могут принимать значение NULL в синхронизированной таблице. Поэтому строки со значениями NULL в столбцах первичного ключа исключаются из синхронизации.
Если две строки имеют один и тот же первичный ключ в исходной таблице, выберите Ключ временного ряда, чтобы настроить дедупликацию. Если указан ключ Timeseries , синхронизированные таблицы содержат только строки с последним значением ключа таймерии для каждого первичного ключа.
Выберите режим синхронизации из Снимок, По триггеру и Непрерывный. Дополнительные сведения о каждом режиме синхронизации см. в разделе "Режимы синхронизации".
Выберите, нужно ли создать эту синхронизированную таблицу из нового или существующего конвейера.
- При создании конвейера и использовании управляемого каталога выберите расположение хранилища для промежуточной таблицы. При использовании стандартного каталога промежуточная таблица автоматически хранится в каталоге.
- Если используется существующий конвейер, убедитесь, что новый режим синхронизации соответствует режиму конвейера.
(Необязательно) Выберите бессерверную политику бюджета. Сведения о создании бессерверной бюджетной политики см. в разделе "Использование атрибутов с бессерверными политиками бюджета". Это позволяет атрибутировать использование выставления счетов определенным политикам использования.
- Для синхронизированных таблиц оплачиваемый объект — это базовый конвейер декларативных конвейеров Lakeflow Spark. Чтобы изменить политику бюджета, измените базовый объект конвейера. См. раздел "Настройка бессерверного конвейера".
После того как состояние синхронизированной таблицы находится в Сети, войдите в экземпляр базы данных и запросите только что созданную таблицу. Запросите таблицу с помощью редактора SQL, внешних средств или записных книжек.
Пакет SDK для Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="database_catalog.schema.synced_table", # Full three-part name
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"], # Primary key columns
scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, # SNAPSHOT, TRIGGERED, or CONTINUOUS
# Optional: timeseries_key="timestamp" # For deduplication
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="standard_catalog.schema.synced_table", # Full three-part name
database_instance_name="my-database-instance", # Required for standard catalogs
logical_database_name="postgres_database", # Required for standard catalogs
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"],
scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
create_database_objects_if_missing=True, # Create database/schema if needed
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
интерфейс командной строки (CLI)
# Create a synced table in a database catalog
databricks database create-synced-database-table \
--json '{
"spec": {
"name": "database_catalog.schema.synced_table",
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "TRIGGERED"
},
"new_pipeline_spec": {
"storage_catalog": "storage_catalog",
"storage_schema": "storage_schema"
}
}'
# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
--database-instance-name "my-database-instance" \
--logical-database-name "databricks_postgres" \
--json '{
"name": "standard_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "CONTINUOUS",
"create_database_objects_if_missing": true
}
}'
# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"
завиток
Создайте синхронизированную таблицу в каталоге баз данных.
export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG="storage_catalog"
export ST_SCHEMA="storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
Создайте синхронизированную таблицу в стандартном каталоге каталога Unity.
export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG="storage_catalog"
export ST_SCHEMA="storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"database_instance_name": "$DATABASE_INSTANCE",
"logical_database_name": "$POSTGRES_DATABASE",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
Проверьте состояние синхронизированной таблицы.
export SYNCEDTABLE='pg_db.silver.sbtest1_online'
curl --request GET \
"https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
--header "Authorization: Bearer dapi..."
Описанные режимы синхронизации
Синхронизированную таблицу можно создать с помощью одного из следующих режимов синхронизации, которые определяют, как данные синхронизируются из источника с синхронизированной таблицей в Postgres:
| Режим синхронизации | Description | Performance |
|---|---|---|
| Снимок | Конвейер данных запускается один раз, чтобы создать моментальный снимок исходной таблицы и скопировать его в синхронизированную таблицу. Последующие запуски конвейера копируют все исходные данные в место назначения и полностью заменяют их на месте атомарным образом. Конвейер можно активировать вручную, через API или по расписанию. | Этот режим более 10 раз эффективнее, чем режимы активации или непрерывной синхронизации, так как они создают данные с нуля. Если вы изменяете более 10% исходной таблицы, рассмотрите возможность использования этого режима. |
| Активировано | Конвейер данных запускается один раз, чтобы создать моментальный снимок исходной таблицы и скопировать его в синхронизированную таблицу. В отличие от режима синхронизации моментальных снимков, при обновлении синхронизированной таблицы только изменения с момента последнего выполнения конвейера извлекаются и применяются к синхронизированной таблице. Добавочное обновление можно активировать вручную, через API или по расписанию. | Этот режим является хорошим компромиссом между задержкой и затратами, так как он выполняется по запросу и применяет изменения только с момента последнего запуска. Чтобы свести к минимуму задержку, запустите этот конвейер сразу после обновления исходной таблицы. Если вы работаете чаще, чем каждые 5 минут, это может быть дороже, чем непрерывный режим из-за затрат на запуск и остановку конвейера каждый раз. |
| Непрерывный | Конвейер выполняется один раз, чтобы создать моментальный снимок исходной таблицы и скопировать его в синхронизированную таблицу, а затем конвейер выполняется непрерывно. Последующие изменения исходной таблицы постепенно применяются к синхронизированной таблице в режиме реального времени. Обновление вручную не требуется. | Этот режим имеет наименьшую задержку, но более высокую стоимость, так как она постоянно выполняется. |
Замечание
Для поддержки режима триггерной или непрерывной синхронизации исходная таблица должна включать канал изменений данных. Некоторые источники (например, представления) не поддерживают поток изменений данных, поэтому их можно синхронизировать только в режиме моментального снимка.
Поддерживаемые операции
Databricks рекомендует выполнять только следующие операции в Postgres для синхронизированных таблиц, чтобы предотвратить случайные перезаписи или несоответствия данных:
- Запросы только для чтения
- Создание индексов
- Удаление таблицы (чтобы освободить место после удаления синхронизированной таблицы из каталога Unity)
Хотя вы можете изменить эту таблицу другими способами, она вмешивается в конвейер синхронизации.
Удаление синхронизированной таблицы
Чтобы удалить синхронизированную таблицу, необходимо удалить ее из каталога Unity, а затем удалить таблицу в экземпляре базы данных. Удаление синхронизированной таблицы из каталога Unity отменяет регистрацию таблицы и останавливает обновления данных. Однако таблица остается в базовой базе данных Postgres. Чтобы освободить место в экземпляре базы данных, подключитесь к экземпляру и используйте команду DROP TABLE.
Пользовательский интерфейс
- Щелкните каталог на боковой панели рабочей области.
- Найдите и выберите синхронизированную таблицу, которую вы хотите удалить.
- Щелкните
>Удаление.
- Подключитесь к экземпляру с
psql, с помощью редактора SQL или записной книжки. - Удалите таблицу с помощью PostgreSQL.
DROP TABLE synced_table_database.synced_table_schema.synced_table
Пакет SDK для Python
from databricks.sdk import WorkspaceClient
# Initialize the Workspace client
w = WorkspaceClient()
# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
интерфейс командной строки (CLI)
# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
завиток
# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
Владение и разрешения
Если вы создаете новую базу данных Postgres, схему или таблицу, свойство Postgres устанавливается следующим образом:
- Владение назначается пользователю, который создает базу данных, схему или таблицу, если его имя входа Azure Databricks существует как роль в Postgres. Чтобы добавить роль удостоверения Azure Databricks в Postgres, см. статью "Управление ролями Postgres".
- В противном случае владение назначается владельцу родительского объекта в Postgres (обычно
databricks_superuser).
Управление доступом к синхронизированной таблице
После создания databricks_superuser синхронизированной таблицы можно READ синхронизировать таблицу из Postgres. Этот databricks_superuserpg_read_all_dataпараметр позволяет считывать эту роль из всех таблиц. Он также имеет привилегию pg_write_all_data , которая позволяет этой роли записывать все таблицы. Это означает, что databricks_superuser можно также записывать в синхронизированную таблицу в Postgres. Lakebase поддерживает это поведение записи в случае, если необходимо внести срочные изменения в целевую таблицу. Однако Databricks рекомендует вносить исправления в исходную таблицу.
Кроме того,
databricks_superuserэти привилегии можно предоставить другим пользователям:GRANT USAGE ON SCHEMA synced_table_schema TO user;GRANT SELECT ON synced_table_name TO user;databricks_superuserможет отозвать эти привилегии:REVOKE USAGE ON SCHEMA synced_table_schema FROM user;REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
Управление синхронизированными операциями таблицы
databricks_superuser может управлять тем, какие пользователи уполномочены выполнять конкретные операции в синхронизированной таблице. Поддерживаемые операции для синхронизированных таблиц:
CREATE INDEXALTER INDEXDROP INDEXDROP TABLE
Все остальные операции DDL запрещены для синхронизированных таблиц.
Чтобы предоставить этим привилегиям дополнительным пользователям, databricks_superuser сначала необходимо создать расширение в databricks_auth:
CREATE EXTENSION IF NOT EXISTS databricks_auth;
databricks_superuser Затем пользователь может добавить пользователя для управления синхронизированной таблицей:
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
databricks_superuser может удалить пользователя из управления синхронизированной таблицей.
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
databricks_superuser может просматривать всех руководителей:
SELECT * FROM databricks_synced_table_managers;
Сопоставление типов данных
Эта таблица сопоставления типов определяет, как каждый тип данных в исходной таблице каталога Unity сопоставляется с целевой таблицей синхронизации в Postgres:
| Тип исходного столбца | Тип столбца Postgres |
|---|---|
| BIGINT | БИГИНТ |
| ДВОИЧНЫЙ | BYTEA |
| БУЛЕВ | BOOLEAN |
| ДАТА | DATE |
| DECIMAL(p,s) | ЧИСЛОВОЙ |
| ДВОЙНОЙ | ДВОЙНАЯ ТОЧНОСТЬ |
| ПЛАВАТЬ | РЕАЛЬНЫЙ |
| INT | ЦЕЛОЕ ЧИСЛО |
| Интервал INTERVALQualifier | ИНТЕРВАЛ |
| SMALLINT | СМОЛЛИНТ |
| СТРУНА | ТЕКСТ |
| TIMESTAMP | Метка времени с часовым поясом |
| TIMESTAMP_NTZ | МЕТКА ВРЕМЕНИ БЕЗ ЧАСОВОГО ПОЯСА |
| TINYINT | СМОЛЛИНТ |
| GEOGRAPHY(srid) | НЕ ПОДДЕРЖИВАЕТСЯ |
| Geometry(srid) | НЕ ПОДДЕРЖИВАЕТСЯ |
| Элемент ARRAYType <> | JSONB |
| MAP < keyType,valueType > | JSONB |
| STRUCT < [fieldName: fieldType [NOT NULL][COMMENT str][, ...]] > | JSONB |
| ВАРИАНТ | JSONB |
| ОБЪЕКТ | НЕ ПОДДЕРЖИВАЕТСЯ |
Замечание
- Сопоставление типов ARRAY, MAP и STRUCT было изменено в мае 2025 года. Синхронизировать таблицы, созданные до этого, чтобы сопоставить эти типы с JSON.
- Сопоставление для TIMESTAMP было изменено в августе 2025 года. Таблицы синхронизации, созданные до этого, будут сопоставляться с TIMESTAMP БЕЗ ЧАСОВОГО ПОЯСа.
Обработка недопустимых символов
Некоторые символы, такие как байт null (0x00), допускаются в STRING, ARRAYMAPили STRUCT столбцах в таблицах Delta, но не поддерживаются в Postgres TEXT или JSONB столбцах. В результате синхронизация таких данных из Delta в Postgres может привести к сбоям вставки с ошибками:
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
- Первая ошибка возникает, когда байт null отображается в строковом столбце верхнего уровня, который сопоставляется непосредственно с Postgres
TEXT. - Вторая ошибка возникает, когда байт null появляется в строке, вложенной в сложный тип (
STRUCT,ARRAYилиMAP), который Azure Databricks сериализует какJSONB. Во время сериализации все строки привязываются к PostgresTEXT, где\u0000запрещено.
Способы устранения:
Эту проблему можно устранить одним из следующих способов:
Очистка строковых полей
Удалите или замените неподдерживаемые символы из всех строковых полей, в том числе из сложных типов, перед синхронизацией с Postgres.
Чтобы удалить null байты из столбца верхнего уровня
STRING, используйте функциюREPLACE:SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;Преобразование в двоичный файл (только для
STRINGстолбцов)Если необходимо сохранить необработанное содержимое байтов, преобразуйте затронутые столбцы
STRINGвBINARY.
Ограничения и рекомендации
Поддерживаемые исходные таблицы
В зависимости от режима синхронизации синхронизированной таблицы поддерживаются различные типы исходных таблиц:
Для режима моментального снимка исходная таблица должна поддерживаться
SELECT *. Примеры включают разностные таблицы, таблицы Айсберга, представления, материализованные представления и другие аналогичные типы.Для режимов активации или непрерывной синхронизации исходная таблица также должна включать канал изменений.
Ограничения именования и идентификаторов
-
Допустимые символы: Имена баз данных Postgres, схемы и таблиц для синхронизированных таблиц могут содержать только буквенно-цифровые символы и символы подчеркивания (
[A-Za-z0-9_]+). Дефисы (-) и другие специальные символы не поддерживаются. - Идентификаторы столбцов и таблиц: Избегайте использования прописных букв или специальных символов в именах столбцов или таблиц в исходной таблице каталога Unity. Если они сохранены, при ссылке на них в Postgres необходимо процитировать эти идентификаторы.
Быстродействие и синхронизация
- Скорость синхронизации: Синхронизация данных в синхронизированные таблицы может быть медленнее, чем запись данных непосредственно в экземпляр базы данных с собственным клиентом PostgreSQL из-за дополнительной обработки. Режим непрерывной синхронизации обновляет данные из управляемой таблицы каталога Unity в синхронизированную таблицу с минимальным интервалом в 15 секунд.
- Использование подключения: Каждая синхронизация таблицы может использовать до 16 подключений к экземпляру базы данных, что засчитывается в общее ограничение на подключения для экземпляра.
- Идемпотентность API: Синхронизированные API таблицы являются идемпотентными и могут потребовать повторного вызова при возникновении ошибок, чтобы обеспечить своевременные операции.
-
Изменения схемы: Для синхронизированных таблиц в
TriggeredилиContinuousрежиме, только добавочные изменения схемы (например, добавление нового столбца) из таблиц Unity Catalog отражаются в синхронизированной таблице. - Повторяющиеся ключи: Если две строки имеют один и тот же первичный ключ в исходной таблице, конвейер синхронизации завершается ошибкой, если не настроить дедупликацию с помощью ключа таймерии. Однако использование ключа временного ряда влечет за собой снижение производительности.
- Частота обновления: Конвейер синхронизации поддерживает непрерывную запись примерно в 1200 строк в секунду на единицу емкости (CU) и массовую запись до 15 000 строк в секунду на единицу емкости (CU).
Емкость и ограничения
-
Ограничения таблицы:
- Ограничение в 20 синхронизированных таблиц на исходную таблицу.
- Каждая синхронизация таблиц может использовать до 16 подключений к базе данных.
-
Ограничения размера и полное обновление:
- Если вы полностью обновите синхронизированную таблицу, старая версия в Postgres не удаляется, пока новая таблица не будет синхронизирована. Обе версии временно засчитываются в счет лимита размера логической базы данных во время обновления.
- Отдельные синхронизированные таблицы не имеют ограничения, но общий предел размера логических данных во всех таблицах в экземпляре составляет 2 ТБ. Однако если требуется обновление вместо полного воссоздания таблицы, Databricks рекомендует не превышать 1 ТБ.
- Если необработанный размер в строковом формате таблицы каталога Unity превышает установленные ограничения на размер экземпляра базы данных (2 ТБ), происходит сбой синхронизации. Перед записью в инстанс необходимо удалить синхронизированную таблицу в Postgres.
Интеграция с каталогом
- Дублирование каталога: Создание синхронизированной таблицы в стандартном каталоге, предназначенной для базы данных Postgres, которая также зарегистрирована в качестве отдельного каталога базы данных, приводит к тому, что синхронизированная таблица будет отображаться в каталоге Unity как в стандартном, так и в каталогах баз данных.