Important
この機能は、、westus、westus2、eastus、eastus2、centralus、southcentralus、northeurope、westeurope、australiaeast、brazilsouth、canadacentral、centralindia、southeastasiaの各リージョンのuksouth段階にあります。
このページでは、同期されたテーブルを作成および管理する方法について説明します。 同期テーブルは、Unity カタログ テーブルから Lakebase データベース インスタンスにデータを自動的に同期する、Unity カタログの読み取り専用 Postgres テーブルです。 Unity カタログ テーブルを Postgres に同期すると、待機時間の短い読み取りクエリが可能になり、他の Postgres テーブルとのクエリ時間の結合がサポートされます。
同期は、Lakeflow Spark 宣言パイプラインによって処理されます。 マネージド パイプラインは、ソース テーブルからの変更で Postgres テーブルを継続的に更新します。 作成後、同期されたテーブルは Postgres ツールを使用して直接照会できます。
同期テーブルの主な特性は次のとおりです。
- ソースとのデータ整合性を維持するために、Postgres を読み取り専用に設定
- マネージド Lakeflow Spark 宣言パイプラインを使用して自動的に同期する
- 標準の PostgreSQL インターフェイスを介してクエリ可能
- ガバナンスとライフサイクル管理のために Unity カタログを介して管理
開始する前に
- 任意のカタログに Unity カタログ テーブルがあります。
- データベース インスタンスに対する
CAN USE権限があります。
同期テーブルを作成する
UI
Unity カタログ テーブルを Postgres に同期するには、次の操作を行います。
ワークスペースのサイドバーで [ カタログ ] をクリックします。
同期テーブルを作成する Unity カタログ テーブルを見つけて選択します。
[ 作成>同期テーブル] をクリックします。
カタログ、スキーマを選択し、新しい同期テーブルのテーブル名を入力します。
- 同期テーブルは、いくつかの追加構成を使用して Standard カタログに作成することもできます。 標準カタログ、スキーマを選択し、新しく作成した同期テーブルのテーブル名を入力します。
データベース インスタンスを選択し、同期テーブルを作成する Postgres データベースの名前を入力します。 Postgres データベース フィールドの既定値は、現在選択されているターゲット カタログです。 この名前の下に Postgres データベースが存在しない場合、Azure Databricks によって新しいデータベースが作成されます。
主キーを選択します。 主キーは、読み取り、更新、および削除の行への効率的なアクセスを可能にするため、 必要 です。
Important
同期されたテーブルでは、主キーの列は null 許容ではありません。 したがって、主キー列に null が含まれる行は 同期から除外されます。
ソース テーブル内の 2 つの行に同じ主キーがある場合は、 Timeseries キー を選択して重複除去を構成します。 時系列キーを指定すると、同期されたテーブルには、各主キーの最新の時系列キー値を持つ行のみが含まれます。
[スナップショット]、[トリガー済み]、[継続的] から同期モードを選択します。 各同期モードの詳細については、説明 されている同期モードを参照してください。
この同期テーブルを新規または既存のパイプラインから作成するかどうかを選択します。
- 新しいパイプラインを作成し、マネージド カタログを使用する場合は、ステージング テーブルのストレージの場所を選択します。 標準カタログを使用している場合、ステージング テーブルはカタログに自動的に格納されます。
- 既存のパイプラインを使用している場合は、新しい同期モードがパイプライン モードと一致することを確認します。
(省略可能) サーバーレス予算ポリシーを選択します。 サーバーレス予算ポリシーを作成するには、「サーバーレス予算ポリシー を使用した属性の使用」を参照してください。 これにより、課金の使用状況を特定の使用状況ポリシーに属性付けできます。
- 同期テーブルの場合、課金対象エンティティは基になる Lakeflow Spark 宣言型パイプラインです。 予算ポリシーを変更するには、基になるパイプライン オブジェクトを変更します。 サーバーレス パイプラインの構成を参照してください。
同期されたテーブルの状態がオンラインになった後、データベース インスタンスにログインし、新しく作成されたテーブルに対してクエリを実行します。 SQL エディター、外部ツール、またはノートブックを使用してテーブルのクエリを実行します。
Python SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import SyncedDatabaseTable, SyncedTableSpec, NewPipelineSpec, SyncedTableSchedulingPolicy
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table in a database catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="database_catalog.schema.synced_table", # Full three-part name
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"], # Primary key columns
scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, # SNAPSHOT, TRIGGERED, or CONTINUOUS
# Optional: timeseries_key="timestamp" # For deduplication
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Create a synced table in a standard UC catalog
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="standard_catalog.schema.synced_table", # Full three-part name
database_instance_name="my-database-instance", # Required for standard catalogs
logical_database_name="postgres_database", # Required for standard catalogs
spec=SyncedTableSpec(
source_table_full_name="source_catalog.source_schema.source_table",
primary_key_columns=["id"],
scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS,
create_database_objects_if_missing=True, # Create database/schema if needed
new_pipeline_spec=NewPipelineSpec(
storage_catalog="storage_catalog",
storage_schema="storage_schema"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
synced_table_name = "database_catalog.schema.synced_table"
status = w.database.get_synced_database_table(name=synced_table_name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
CLI
# Create a synced table in a database catalog
databricks database create-synced-database-table \
--json '{
"spec": {
"name": "database_catalog.schema.synced_table",
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "TRIGGERED"
},
"new_pipeline_spec": {
"storage_catalog": "storage_catalog",
"storage_schema": "storage_schema"
}
}'
# Create a synced table in a standard UC catalog
# new_pipeline_spec, storage_catalog, and storage_schema are optional
databricks database create-synced-database-table \
--database-instance-name "my-database-instance" \
--logical-database-name "databricks_postgres" \
--json '{
"name": "standard_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "source_catalog.source_schema.source_table",
"primary_key_columns": ["id"],
"scheduling_policy": "CONTINUOUS",
"create_database_objects_if_missing": true
}
}'
# Check the status of a synced table
databricks database get-synced-database-table "database_catalog.schema.synced_table"
curl
データベース カタログに同期テーブルを作成します。
export CATALOG_NAME=<Database catalog>
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.synced_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
標準の Unity カタログ カタログに同期テーブルを作成します。
export CATALOG_NAME=<Standard catalog>
export DATABASE_INSTANCE=<database instance>
export POSTGRES_DATABASE=$CATALOG_NAME
export SRC_TBL="source_catalog.source_schema.source_table"
export DEST_TBL="$CATALOG_NAME.some_schema.sync_table"
export PKS='["id"]'
export ST_CATALOG = "storage_catalog"
export ST_SCHEMA = "storage_schema"
curl -X POST https://$WORKSPACE/api/2.0/database/synced_tables \
-H "Content-Type: text/json" \
-H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data-binary @- << EOF
{
"name": "$DEST_TBL",
"database_instance_name": "$DATABASE_INSTANCE",
"logical_database_name": "$POSTGRES_DATABASE",
"spec": {
"source_table_full_name": "$SRC_TBL",
"primary_key_columns": $PKS,
"scheduling_policy": "TRIGGERED",
},
"new_pipeline_spec": {
"storage_catalog": "$ST_CATALOG",
"storage_schema": "$ST_SCHEMA",
}
}
EOF
同期されたテーブルの状態を確認します。
export SYNCEDTABLE='pg_db.silver.sbtest1_online'
curl --request GET \
"https://e2-dogfood.staging.cloud.databricks.com/api/2.0/database/synced_tables/$SYNCEDTABLE" \
--header "Authorization: Bearer dapi..."
同期モードの説明
同期テーブルは、次のいずれかの同期モードで作成できます。これは、Postgres でソースから同期されたテーブルへのデータの同期方法を決定します。
| 同期モード | Description | Performance |
|---|---|---|
| スナップショット | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーします。 後続のパイプライン実行では、ソース データ全体をコピー先にコピーし、元のデータをアトミックに一括して置き換えます。 パイプラインは、API を介して、またはスケジュールに従って、手動でトリガーできます。 | このモードは、ゼロからデータを再作成するため、トリガーモードまたは継続的同期モードの 10 倍効率的です。 ソース テーブルの 10% を超える変更を行う場合は、このモードの使用を検討してください。 |
| トリガー済み | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーします。 スナップショット同期モードとは異なり、同期されたテーブルが更新されると、最後のパイプライン実行以降にのみ変更が取得され、同期されたテーブルに適用されます。 増分更新は、API を介して、またはスケジュールに従って、手動でトリガーできます。 | このモードは、必要に応じて実行され、前回の実行以降にのみ変更が適用されるため、ラグとコストの間の良好な妥協点です。 ラグを最小限に抑えるには、ソース テーブルを更新した直後にこのパイプラインを実行します。 これを 5 分ごとに実行する場合は、パイプラインを毎回開始および停止するコストが原因で、継続的モードよりもコストが高くなる可能性があります。 |
| 継続的 | パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、同期されたテーブルにコピーした後、パイプラインが継続的に実行されます。 ソース テーブルに対する後続の変更は、同期されたテーブルにリアルタイムで増分的に適用されます。 手動更新は必要ありません。 | このモードは継続的に実行されるため、ラグは最も低くなりますが、コストは高くなります。 |
注
[トリガー] または [連続] の同期モードをサポートするには、ソース テーブルで [データ フィードの変更] を有効にする必要があります。 特定のソース (ビューなど) では変更データ フィードがサポートされていないため、スナップショット モードでのみ同期できます。
サポートされている操作
Databricks では、誤った上書きやデータの不整合を防ぐために、同期されたテーブルに対して Postgres で次の操作のみを実行することをお勧めします。
- 読み取り専用クエリ
- インデックスの作成
- テーブルを削除する (Unity カタログから同期されたテーブルを削除した後に領域を解放するため)
このテーブルは他の方法で変更できますが、同期パイプラインに干渉します。
同期されたテーブルを削除する
同期されたテーブルを削除するには、Unity カタログから削除し、データベース インスタンス内のテーブルを削除する必要があります。 Unity カタログから同期されたテーブルを削除すると、テーブルの登録が解除され、データの更新が停止されます。 ただし、テーブルは基になる Postgres データベースに残ります。 データベース インスタンスの領域を解放するには、インスタンスに接続し、 DROP TABLE コマンドを使用します。
UI
- ワークスペースのサイドバーで [ カタログ ] をクリックします。
- 削除する同期テーブルを見つけて選択します。
-
クリックします。>削除します。
-
psql、SQL エディター、またはノートブックからインスタンスに接続します。 - PostgreSQL を使用してテーブルを削除します。
DROP TABLE synced_table_database.synced_table_schema.synced_table
Python SDK
from databricks.sdk import WorkspaceClient
# Initialize the Workspace client
w = WorkspaceClient()
# Delete a synced table from UC
synced_table_name = "catalog.schema.synced_table"
w.database.delete_synced_database_table(name=synced_table_name)
print(f"Deleted synced table from UC: {synced_table_name}")
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
CLI
# Delete a synced table from UC
databricks database delete-synced-database-table "catalog.schema.synced_table"
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
curl
# Delete a synced table from UC
curl -X DELETE --header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
https://$WORKSPACE/api/2.0/database/synced_tables/$SYNCED_TABLE_NAME
# To free up space in your database instance, you need to connect to the
# instance and drop the table using PostgreSQL:
#
# DROP TABLE synced_table_database.synced_table_schema.synced_table;
所有権とアクセス許可
新しい Postgres データベース、スキーマ、またはテーブルを作成する場合、Postgres の所有権は次のように設定されます。
- Azure Databricks ログインが Postgres のロールとして存在する場合、データベース、スキーマ、またはテーブルを作成するユーザーに所有権が割り当てられます。 Postgres で Azure Databricks ID ロールを追加するには、「 Postgres ロールの管理」を参照してください。
- それ以外の場合、所有権は Postgres の親オブジェクトの所有者 (通常は
databricks_superuser) に割り当てられます。
同期されたテーブル アクセスの管理
同期されたテーブルが作成されると、 databricks_superuser は Postgres から同期されたテーブルを READ できます。
databricks_superuserにはpg_read_all_dataがあり、このロールはすべてのテーブルから読み取られます。 また、このロールはすべてのテーブルに書き込むための pg_write_all_data 特権も持ちます。 つまり、 databricks_superuser は Postgres で同期されたテーブルに書き込むこともできます。 Lakebase では、ターゲット テーブルに緊急の変更を加える必要がある場合に備えて、この書き込み動作がサポートされています。 ただし、Databricks では、代わりにソース テーブルで修正を行うことをお勧めします。
databricks_superuserは、次の権限を他のユーザーに付与することもできます。GRANT USAGE ON SCHEMA synced_table_schema TO user;GRANT SELECT ON synced_table_name TO user;databricks_superuserは、次の特権を取り消すことができます。REVOKE USAGE ON SCHEMA synced_table_schema FROM user;REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
同期されたテーブル操作を管理する
databricks_superuserでは、同期されたテーブルに対して特定の操作を実行する権限を持つユーザーを管理できます。 同期テーブルでサポートされる操作は次のとおりです。
CREATE INDEXALTER INDEXDROP INDEXDROP TABLE
他のすべての DDL 操作は、同期されたテーブルに対して拒否されます。
これらの権限を追加のユーザーに付与するには、 databricks_superuser が最初に databricks_auth で拡張機能を作成する必要があります。
CREATE EXTENSION IF NOT EXISTS databricks_auth;
その後、 databricks_superuser は、同期されたテーブルを管理するユーザーを追加できます。
SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');
databricks_superuserは、同期されたテーブルの管理からユーザーを削除できます。
SELECT databricks_synced_table_remove_manager('[table]', '[user]');
databricks_superuserでは、すべてのマネージャーを表示できます。
SELECT * FROM databricks_synced_table_managers;
データ型のマッピング
この型マッピング テーブルは、ソース Unity カタログ テーブル内の各 データ型 を Postgres の同期先テーブルにマップする方法を定義します。
| ソース列の種類 | Postgres 列の種類 |
|---|---|
| BIGINT | BIGINT |
| バイナリ | BYTEA |
| ブーリアン | BOOLEAN |
| 日付 | DATE |
| DECIMAL(p,s) | 数値 |
| 複 | 倍精度 |
| 浮く | 本当の |
| INT | INTEGER |
| INTERVAL intervalQualifier | 間 |
| SMALLINT | スモールイント |
| 糸 | テキスト |
| タイムスタンプ | タイム ゾーンを含むタイムスタンプ |
| TIMESTAMP_NTZ | タイム ゾーンのないタイムスタンプ |
| TINYINT | スモールイント |
| GEOGRAPHY(srid) | サポートされていません |
| GEOMETRY(srid) | サポートされていません |
| ARRAY < elementType > | JSONB |
| MAP < keyType,valueType > | JSONB |
| STRUCT < [fieldName : fieldType [NOT NULL][COMMENT str][, ...]] > | JSONB |
| バリアント | JSONB |
| オブジェクト | サポートされていません |
注
- ARRAY、MAP、および STRUCT 型のマッピングは、2025 年 5 月に変更されました。 その前に作成されたテーブルを同期して、これらの型を JSON にマップし続けます。
- TIMESTAMP のマッピングは 2025 年 8 月に変更されました。 その前に作成されたテーブルを同期すると、タイム ゾーンなしで TIMESTAMP にマップされます。
無効な文字を処理する
null バイト (0x00) などの特定の文字は、デルタ テーブルの STRING、 ARRAY、 MAP、または STRUCT 列で使用できますが、Postgres TEXT または JSONB 列ではサポートされていません。 その結果、DeltaからPostgresにデータを同期することによって、挿入エラーなどの問題が発生する可能性があります。
org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding "UTF8": 0x00
org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text.
- 最初のエラーは、Postgres
TEXTに直接マップされる最上位の文字列列に null バイトが表示されるときに発生します。 - 2 番目のエラーは、Azure Databricks が
STRUCTとしてシリアル化する複合型 (ARRAY、MAP、またはJSONB) 内に入れ子になった文字列に null バイトが含まれている場合に発生します。 シリアル化中、すべての文字列は PostgresTEXTにキャストされます。ここで、\u0000は許可されません。
解決方法:
この問題には、次のいずれかの方法で対処できます。
文字列フィールドをサニタイズする
Postgres に同期する前に、複合型内の文字列フィールドを含むすべての文字列フィールドからサポートされていない文字を削除または置換します。
最上位の
STRING列から null バイトを削除するには、REPLACE関数を使用します。SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table;バイナリへの変換 (
STRING列のみ)生のバイト コンテンツを保持する必要がある場合は、影響を受ける
STRING列をBINARYに変換します。
制限事項と考慮事項
サポートされているソース テーブル
同期テーブルの同期モードに応じて、さまざまな種類のソース テーブルがサポートされます。
スナップショット モードの場合、ソース テーブルは
SELECT *をサポートする必要があります。 たとえば、Delta テーブル、Iceberg テーブル、ビュー、具体化されたビュー、その他の同様の種類があります。トリガーモードまたは継続的同期モードでは、ソース テーブルで変更データ フィードも有効になっている必要があります。
名前付けと識別子の制限事項
-
使用できる文字: 同期されたテーブルの Postgres データベース、スキーマ、およびテーブル名には、英数字とアンダースコア (
[A-Za-z0-9_]+) のみを含めることができます。 ハイフン (-) およびその他の特殊文字はサポートされていません。 - 列とテーブルの識別子: ソース Unity カタログ テーブルの列名またはテーブル名に大文字または特殊文字を使用しないでください。 保持されている場合は、Postgres で参照するときにこれらの識別子を引用符で囲む必要があります。
パフォーマンスと同期
- 同期速度: 同期されたテーブルへのデータの同期は、追加の処理により、ネイティブ PostgreSQL クライアントを使用してデータベース インスタンスに直接データを書き込むよりも時間がかかる場合があります。 継続的同期モードでは、Unity カタログのマネージド テーブルから同期されたテーブルに、15 秒以上の間隔でデータが更新されます。
- 接続の使用法: 各テーブル同期では、データベース インスタンスへの最大 16 個の接続を使用できます。これは、インスタンスの接続制限にカウントされます。
- API のべき等性: 同期テーブルに関する API にはべき等性があり、操作をタイムリーに実行するために、エラー発生時に再試行が必要になる場合があります。
-
スキーマの変更:
TriggeredモードまたはContinuousモードの同期テーブルの場合、Unity カタログ テーブルからの追加スキーマ変更 (新しい列の追加など) のみが同期テーブルに反映されます。 - キーの重複: ソース テーブル内の 2 つの行の主キーが同じである場合、時系列キーを使用して重複除去を構成しない限り、同期パイプラインは失敗します。 ただし、時系列キーを使用すると、パフォーマンスが低下します。
- 更新率: 同期パイプラインは、容量ユニット (CU) あたり約 1,200 行/秒の連続書き込みをサポートし、CU あたり 1 秒あたり最大 15,000 行の一括書き込みをサポートします。
容量と制限
-
テーブルの制限:
- ソース テーブルあたり 20 個の同期テーブルの制限。
- 各テーブル同期では、最大 16 個のデータベース接続を使用できます。
-
サイズの制限と完全な更新:
- 同期されたテーブルを完全に更新した場合、Postgres の古いバージョンは、新しいテーブルが同期されるまで削除されません。 どちらのバージョンも、更新中に論理データベース サイズの制限に一時的にカウントされます。
- 個々の同期テーブルには制限はありませんが、インスタンス内のすべてのテーブルの論理データ サイズの合計制限は 2 TB です。 ただし、完全なテーブル再作成ではなく更新が必要な場合は、Databricks では 1 TB を超えないことをお勧めします。
- Unity カタログ テーブルの圧縮されていない行形式のサイズがデータベース インスタンスのサイズ制限 (2 TB) を超えた場合、同期は失敗します。 インスタンスにさらに書き込む前に、同期されたテーブルを Postgres で削除する必要があります。
カタログ統合
- カタログの重複: 別のデータベース カタログとしても登録されている Postgres データベースをターゲットとする標準カタログに同期テーブルを作成すると、同期されたテーブルが標準カタログとデータベース カタログの両方の下に Unity カタログに表示されます。