這很重要
這項功能在下列區域中處於公開預覽狀態:westus、、westus2eastuseastus2centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouth、canadacentralcentralindia、、 southeastasiauksouth
此頁面說明如何建立和管理同步處理數據表。 同步表是 Unity 目錄的唯讀 Postgres 表,能自動將資料從 Unity 目錄表同步到您的 Lakebase 資料庫實例。 將 Unity Catalog 資料表同步至 Postgres 可啟用低延遲讀取查詢,並支援與其他 Postgres 資料表的查詢時聯接。
同步處理是由 Lakeflow Spark 宣告式管線處理。 受管管線會持續使用來源資料表的變更來更新 Postgres 資料表。 建立之後,您可以使用 Postgres 工具直接查詢同步處理數據表。
同步資料表的主要特性如下:
- 為了保持與來源的數據完整性,Postgres 中採用唯讀模式。
- 使用受管理的 Lakeflow Spark 宣告式管道自動同步化
- 可透過標準 PostgreSQL 介面進行查詢
- 透過 Unity Catalog 管理治理與生命週期管理
開始之前
- 您在任何目錄中都有 Unity 目錄資料表。
- 您具有
CAN USE資料庫實例的許可權。
建立同步資料表
UI
若要將 Unity 目錄資料表同步至 Postgres,請執行下列動作:
按一下工作區側邊欄中的目錄。
尋找並選取您要在其上建立同步表的 Unity Catalog 數據表。
按一下 建立>同步處理資料表。
選取您的目錄、架構,然後輸入新同步數據表的數據表名稱。
- 您也可以在 標準目錄中建立同步處理數據表,並設定一些額外的設定。 選取您的標準目錄、架構,然後輸入新建立同步數據表的數據表名稱。
選取資料庫實例,然後輸入要在其中建立同步數據表的 Postgres 資料庫名稱。 Postgres 資料庫欄位預設為目前選取的目標目錄。 如果 Postgres 資料庫在此名稱下不存在,Azure Databricks 會建立新的資料庫。
選取 主鍵。 需要主鍵,因為它可讓您有效率地存取數據列以進行讀取、更新和刪除。
這很重要
主鍵中的資料行在同步處理的資料表中不可為空。 因此,主鍵欄位中具有 Null 的資料列會 從同步中排除。
如果源數據表中有兩個數據列具有相同的主鍵,請選取 [時間索引鍵 ] 來設定重複數據刪除。 指定 Timeseries Key 時,同步處理數據表只會包含每個主鍵具有最新時間索引鍵值的數據列。
選取同步模式:從 快照、觸發 和 連續 中選擇。 如需每種同步模式的詳細資訊,請參閱 同步模式說明。
選擇您是否要從新的或現有的管線建立此同步數據表。
- 如果建立新管線並使用受管目錄,請選擇暫存資料表的儲存位置。 如果使用標準目錄,暫存表會自動儲存在目錄中。
- 如果使用現有的管線,請檢查新的同步模式是否符合管線模式。
(選用)選取 無伺服器預算政策。 若要建立無伺服器預算政策,請參閱 使用無伺服器預算政策來屬性使用量。 這可讓您將計費使用量歸因於特定的使用原則。
- 針對同步處理的資料表,可計費實體是基礎 Lakeflow Spark 宣告式管線。 若要修改預算政策,請修改基礎管線物件。 請參閱 設定無伺服器管線。
同步處理數據表狀態為 [在線] 之後,請登入您的資料庫實例並查詢新建立的數據表。 使用 SQL 編輯器、 外部工具或 筆記本來查詢資料表。
Python SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="database_catalog.schema.synced_table", # Full three-part name
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"], # Primary key columns
scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, # SNAPSHOT, TRIGGERED, or CONTINUOUS
# Optional: timeseries_key="timestamp" # For deduplication
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="standard_catalog.schema.synced_table", # Full three-part name
database_instance_name="my-database-instance", # Required for standard catalogs
logical_database_name="postgres_database", # Required for standard catalogs
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"],
scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
create_database_objects_if_missing=True, # Create database/schema if needed
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
CLI
# Create a synced table in a database catalog
databricks database create-synced-database-table \
--json '{
"spec": {
"name": "database_catalog.schema.synced_table",
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "TRIGGERED"
},
"new_pipeline_spec": {
"storage_catalog": "storage_catalog",
"storage_schema": "storage_schema"
}
}'
# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
--database-instance-name "my-database-instance" \
--logical-database-name "databricks_postgres" \
--json '{
"name": "standard_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "CONTINUOUS",
"create_database_objects_if_missing": true
}
}'
# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"
curl (Unix指令)
在資料庫目錄中建立同步的數據表。
export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
在標準 Unity 目錄目錄中建立同步的數據表。
export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"database_instance_name": "$DATABASE_INSTANCE",
"logical_database_name": "$POSTGRES_DATABASE",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
檢查同步處理數據表的狀態。
export SYNCEDTABLE='pg_db.silver.sbtest1_online'
curl --request GET \
"https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
--header "Authorization: Bearer dapi..."
同步模式說明
可以使用下列其中一種同步模式來建立同步資料表,這些模式決定了資料如何從來源同步到 Postgres 中的同步資料表:
| 同步模式 | Description | Performance |
|---|---|---|
| 快照 | 管道會執行一次,以擷取源數據表的快照,並將它複製到同步表。 後續的管線運行會將整個來源資料複製到目的地,並以原子方式取代它。 管線可以手動觸發、透過 API 或依排程觸發。 | 此模式的效率比「觸發」或「連續」同步模式高 10 倍,因為它會從頭開始重新建立資料。 如果您要修改來源資料表的 10% 以上,請考慮使用此模式。 |
| 觸發 | 管道會執行一次,以擷取源數據表的快照,並將它複製到同步表。 不同於快照同步模式,當已同步表格重新整理時,僅擷取並應用從上一次管線執行後所做的變更。 累加式重新整理可以手動觸發、透過 API 或依排程觸發。 | 此模式是延遲和成本之間的良好折衷方案,因為它會隨需執行,而且只會套用上次執行以來的變更。 若要將延遲降到最低,請在更新來源資料表後立即執行此管線。 如果您執行此專案的頻率超過每 5 分鐘一次,則可能比連續模式更昂貴,因為每次啟動和停止管線的成本。 |
| 連續的 | 管線會執行一次,以拍攝來源資料表的快照,並將快照複製至同步的資料表,然後管線會持續執行。 來源資料表的後續變更會以累加方式即時套用至同步處理的資料表。 不需要手動重新整理。 | 此模式的延遲最低,但成本較高,因為它是連續運行的。 |
備註
若要支援 觸發 或 連續 同步模式,源數據表必須已啟用 變更數據摘要。 某些來源 (例如檢視) 不支援變更資料摘要,因此只能在快照模式中同步處理。
支援的功能
Databricks 建議只在 Postgres 中針對同步處理的資料表執行下列作業,以防止意外覆寫或資料不一致:
- 唯讀查詢
- 建立索引
- 刪除資料表(在從 Unity 目錄移除同步資料表後釋放空間)
雖然您可以透過其他方式修改此資料表,但它會干擾同步處理管線。
刪除同步處理數據表
若要刪除同步的數據表,您必須從 Unity 目錄刪除它,然後卸除資料庫實例中的數據表。 從 Unity 目錄刪除同步的數據表會取消註冊數據表,並停止任何數據重新整理。 不過,數據表會保留在基礎 Postgres 資料庫中。 若要釋放資料庫實例中的空間,請連線到 實例並使用 DROP TABLE 命令。
UI
- 按一下工作區側邊欄中的目錄。
- 尋找並選取您要刪除的同步資料表。
- 按兩下
>刪除。
- 使用
psql、SQL 編輯器或從筆記本連接到 實例。 - 使用 PostgreSQL 卸除數據表。
DROP TABLE synced_table_database.synced_table_schema.synced_table
Python SDK
from databricks.sdk import WorkspaceClient
# Initialize the Workspace client
w = WorkspaceClient()
# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
CLI
# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
curl (Unix指令)
# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
擁有權和許可權
如果您建立新的 Postgres 資料庫、架構或數據表,Postgres 擁有權會設定如下:
- 如果其 Azure Databricks 登入在 Postgres 中存在為角色,則資料庫、資料結構或資料表的擁有權會指派給建立它們的使用者。 若要在 Postgres 中新增 Azure Databricks 身分識別角色,請參閱 管理 Postgres 角色。
- 否則,擁有權會指派給Postgres中父對象的擁有者(通常是
databricks_superuser)。
管理同步的數據表存取
建立同步表後,databricks_superuser 可以從 Postgres 中同步一個表至 READ。
databricks_superuser有pg_read_all_data,可讓此角色從所有資料表讀取。 它還具有 pg_write_all_data 權限,可讓此角色寫入所有表格。 這意味著 a databricks_superuser 也可以寫入 Postgres 中的同步表。 Lakebase 支援此寫入行為,如果您需要在目標資料表中進行緊急變更。 不過,Databricks 建議改為在來源資料表中進行修正。
databricks_superuser也可以將這些許可權授與其他使用者:GRANT USAGE ON SCHEMA synced_table_schema TO user;GRANT SELECT ON synced_table_name TO user;databricks_superuser可以撤銷這些許可權:REVOKE USAGE ON SCHEMA synced_table_schema FROM user;REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
管理同步處理數據表作業
databricks_superuser可以管理哪些用戶有權在同步處理數據表上執行特定作業。 同步資料表的支援作業如下:
CREATE INDEXALTER INDEXDROP INDEXDROP TABLE
同步處理數據表會拒絕所有其他 DDL 作業。
若要將這些許可權授與其他使用者, databricks_superuser 必須先在 上 databricks_auth建立擴充功能:
CREATE EXTENSION IF NOT EXISTS databricks_auth;
然後,databricks_superuser可以新增使用者來管理同步資料表:
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
databricks_superuser可以將使用者移除出同步處理資料表的管理:
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
databricks_superuser可以檢視所有管理員:
SELECT * FROM databricks_synced_table_managers;
數據類型映射
此類型對應資料表會定義來源 Unity 目錄資料表中的每個 資料類型 如何對應至 Postgres 中的目的地同步處理資料表:
| 來源資料行類型 | Postgres 資料行類型 |
|---|---|
| 比特 | BIGINT |
| 二進位 | 貝塔 |
| 布林值 | BOOLEAN |
| 日期 | DATE |
| 十進制(p,s) | 數值 |
| 雙 | 雙精確度 |
| 浮 | 真的 |
| 智力 | 整數 |
| INTERVAL interval限定詞 | 間 |
| 斯莫林特 | 斯莫林特 |
| 繩子 | 簡訊 |
| 時間戳記 | 具有時區的時間戳 |
| TIMESTAMP_NTZ | 沒有時區的時間戳記 |
| 小因特 | 斯莫林特 |
| 地理(srid) | 不支援 |
| 幾何(srid) | 不支援 |
| ARRAY < 元素類型 > | JSONB |
| MAP < 鍵型,值型 > | JSONB |
| 結構體 < [fieldName : fieldType [NOT NULL][COMMENT str][, ...]] > | JSONB |
| 變體 | JSONB |
| 物體 | 不支援 |
備註
- ARRAY、MAP 和 STRUCT 類型的對應已於 2025 年 5 月變更。 在此之前建立的同步資料表會繼續將這些類型對應至 JSON。
- TIMESTAMP 的對應已於 2025 年 8 月變更。 在此之前建立的同步表格會繼續將它對映至 TIMESTAMP WITHOUT TIME ZONE。
處理無效字元
在 Delta 數據表中STRINGARRAYMAP允許某些字元,例如 null 位元組(0x00),但在 Postgres STRUCT 或TEXTJSONB數據行中則不支援。 因此,將這類數據從 Delta 同步至 Postgres 可能會導致插入失敗併發生錯誤:
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
- 當最上層字串數據行中出現 Null 位元組時,就會發生第一個錯誤,該資料行會直接對應至 Postgres
TEXT。 - 當 Azure Databricks 將包含
STRUCT、ARRAY或MAP的複合類型序列化為JSONB時,若內嵌的字串中出現 Null 位元組,就會發生第二個錯誤。 在串行化期間,所有字串都會轉換成 PostgresTEXT,其中\u0000不允許。
如何解決:
您可以透過下列其中一種方式來解決此問題:
清理字串欄位
在同步處理至 Postgres 之前,請先移除或取代所有字串字段中不支援的字元,包括複雜類型內的字元。
若要從最上層
STRING數據行移除 Null 位元組,請使用 函REPLACE式:SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;轉換為二進位檔 (
STRING僅適用於資料列)如果需要保留原始位元組內容,請將受影響的
STRING資料行轉換成BINARY。
限制和考慮
支援的來源資料表
根據同步資料表的同步模式,支援不同類型的來源資料表:
對於快照模式,來源資料表必須支援
SELECT *。 範例包括 Delta 資料表、Iceberg 資料表、檢視、具體化檢視和其他類似類型。對於觸發或連續同步模式,來源資料表 也 必須啟用 變更資料摘要 。
命名和標識碼限制
-
允許的字元: 已同步數據表的 Postgres 資料庫、架構和數據表名稱只能包含英數位元和底線 (
[A-Za-z0-9_]+)。 不支援連字元 (-) 和其他特殊字元。 - 數據行和數據表標識碼: 請避免在來源 Unity Catalog 資料表的數據行或數據表名稱中使用大寫字母或特殊字元。 如果保留,您必須在 Postgres 中參考這些識別碼時加上引號。
效能和可用性
- 同步速度: 將資料同步至已同步的資料表的速度可能會比使用原生 PostgreSQL 用戶端直接寫入資料庫實例的速度慢,因為需要額外的處理。 連續同步模式會以至少 15 秒的間隔,將 Unity Catalog 管理表的數據刷新到已同步資料表。
- 連線使用: 每個資料表同步最多可使用 16 個連線至資料庫實例,這些連線將計入實例的連線限制。
- API 等冪性: 同步的數據表 API 具有等冪性,如果發生錯誤以確保及時的作業,可能需要重試。
-
架構變更:針對
Triggered或Continuous模式中的同步資料表,只有 Unity Catalog 資料表中新增的架構變更(例如新增一個欄)會反映在同步資料表上。 - 重複的索引鍵: 如果源數據表中有兩個數據列具有相同的主鍵,除非您使用 Timeseries 金鑰設定重複資料刪除,否則同步管線會失敗。 不過,使用 Timeseries Key 會受到效能損失。
- 更新速率: 同步處理管線支援每個容量單位每秒大約 1,200 個數據列的連續寫入,而每個 CU 每秒最多 15,000 個數據列的大量寫入。
容量和限制
-
資料表限制:
- 每個來源資料表限制 20 個同步處理資料表。
- 每個數據表同步處理最多可使用16個資料庫連線。
-
大小限制和完整重新整理:
- 如果您完全刷新同步的資料表,在新資料表同步完成之前,不會刪除 Postgres 中舊版本的同步表。 這兩個版本會在重新整理期間暫時計入邏輯資料庫大小限制。
- 個別同步資料表沒有限制,但執行個體中所有資料表的邏輯資料大小總計限制為 2 TB。 不過,如果您需要重新整理,而不是重新建立完整資料表,Databricks 建議不要超過 1 TB。
- 如果 Unity 目錄資料表的未壓縮資料列格式大小超過資料庫執行個體大小限制 (2 TB),同步處理會失敗。 您必須先在 Postgres 中卸除同步的數據表,才能進一步寫入 實例。
目錄整合
- 目錄複製: 在以 Postgres 資料庫為目標的標準目錄中建立同步處理的資料表,也註冊為個別的資料庫目錄,會導致同步處理的資料表出現在標準和資料庫目錄下的 Unity 目錄中。