Предоставление данных Lakehouse с синхронизированными таблицами

Синхронизированные таблицы позволяют обслуживать данные озерохранилища с помощью Lakebase Postgres. Таблицы каталога Unity синхронизируются с Postgres, чтобы приложения могли запрашивать данные lakehouse непосредственно с низкой задержкой. Этот процесс обычно называется обратным ETL. Lakehouse оптимизирован для аналитики и обогащения, а Lakebase предназначен для операционных рабочих нагрузок, требующих быстрого поиска запросов и согласованности транзакций.

Схема архитектуры, показывающая поток данных из Lakehouse в Lakebase в приложения

Что такое синхронизированные таблицы?

Синхронизированные таблицы позволяют обслуживать данные аналитики из каталога Unity через Lakebase Postgres, что делает его доступным для приложений, которым требуются запросы с низкой задержкой и полные транзакции ACID. Они перекрывают разрыв между аналитическим хранилищем и операционными системами, сохраняя ваши данные готовыми к использованию в приложениях в режиме реального времени.

Поддерживаемые источники

Синхронизированные таблицы поддерживают следующие типы источников каталога Unity:

  • Управляемые и внешние таблицы Delta
  • Управляемые и внешние таблицы Iceberg
  • Представления и материализованные представления

Принцип работы

Синхронизированные таблицы Databricks создают управляемую копию данных каталога Unity в Lakebase. При создании синхронизированной таблицы вы получите следующее:

  1. Синхронизированная таблица в каталоге Unity, ссылающаяся на конвейер синхронизации
  2. Таблица Postgres в Lakebase (доступная только для чтения, запрашиваемая приложениями)

Схема, показывающая трехтабличные отношения в синхронизированных таблицах

Например, можно синхронизировать золотые таблицы, встроенные функции или выходные данные машинного обучения в analytics.gold.user_profiles новую синхронизированную таблицу analytics.gold.user_profiles_synced. В Postgres имя схемы каталога Unity становится именем схемы Postgres, поэтому это выглядит следующим образом gold.user_profiles_synced:

SELECT * FROM gold.user_profiles_synced WHERE user_id = 12345;

Приложения подключаются к стандартным драйверам Postgres и запрашивают синхронизированные данные вместе с собственным рабочим состоянием.

Предупреждение

Хотя можно изменить синхронизированную таблицу непосредственно в Postgres, Azure Databricks строго рекомендует выполнять только запросы на чтение для защиты целостности данных с помощью источника. Поддерживаемые операции с синхронизированными таблицами см. в разделе "Операции, разрешенные для синхронизированных таблиц в Postgres".

Конвейеры синхронизации используют управляемые Lakeflow Spark Declarative Pipelines для непрерывного обновления как синхронизированной таблицы Unity Catalog, так и таблицы Postgres с изменениями из исходной таблицы. Каждая синхронизация может использовать до 16 подключений к базе данных Lakebase.

Lakebase Postgres поддерживает до 1000 одновременных подключений с гарантиями транзакций, чтобы приложения могли считывать обогащенные данные, а также обрабатывать вставки, обновления и удаления в той же базе данных.

Режимы синхронизации

Выберите правильный режим синхронизации в зависимости от потребностей приложения:

Режим Описание Когда использовать Производительность
Моментальный снимок Однократная копия всех данных Изменения в источнике >10% строк за цикл или источник не поддерживает CDF (представления, таблицы Iceberg) 10x более эффективно при изменении >10% исходных данных
Запущено Запланированные обновления, которые выполняются по требованию или через интервалы Исходные строки изменяются по известному графику. Вставки, обновления и удаления распространяются каждый раз при обновлении. Хороший баланс затрат и задержки. Дорого, если запускать с интервалами в <5 минут
Непрерывный Потоковая передача в режиме реального времени с задержкой в секундах Изменения должны отображаться в Lakebase практически в режиме реального времени Низкая задержка, самая высокая стоимость. Минимальные 15-секундные интервалы

Триггерный и непрерывный режимы требуют включения канала данных изменений (CDF) в вашей исходной таблице. Если CDF не включен, вы увидите предупреждение в пользовательском интерфейсе с точной ALTER TABLE командой для выполнения. Дополнительные подробности о канале данных об изменениях см. в разделе "Использование канала данных об изменениях Delta Lake на Databricks".

Замечание

Источники, которые не поддерживают CDF (например, представления, материализованные представления и таблицы Iceberg), можно синхронизировать только в режиме моментального снимка . Для режима Snapshot источник должен поддерживать SELECT *.

Примеры вариантов использования

Синхронизированные таблицы можно использовать для таких вариантов использования, как:

  • Подсистемы персонализации, которые служат новым профилям пользователей в приложениях Databricks
  • Приложения, обслуживающие прогнозы модели или значения признаков, вычисляемые в lakehouse
  • Панели мониторинга, обслуживающие ключевые показатели эффективности в режиме реального времени
  • Службы обнаружения мошенничества, которые предоставляют оценки рисков для принятия немедленных мер
  • Средства поддержки, которые служат обогащенным записям клиентов из данных Lakehouse

Создание синхронизированной таблицы

Необходимые условия

Тебе нужно:

  • Рабочая область Databricks с активированной функцией Lakebase.
  • Проект Lakebase (см. раздел "Создание проекта").
  • Таблица каталога Unity для синхронизации.
  • Разрешения на создание синхронизированных таблиц. Вам потребуется USE_SCHEMA и CREATE_TABLE для любой используемой схемы.

Для режимов триггерного или непрерывного типа транслирование изменений данных должно быть включено в исходной таблице.

ALTER TABLE your_catalog.your_schema.your_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Сведения о планировании емкости и совместимости типов данных см. в разделе "Типы данных" и "Планирование емкости".

Пользовательский интерфейс

  1. Перейдите в каталог на боковой панели рабочей области и выберите таблицу каталога Unity, которую вы хотите синхронизировать.

    Обозреватель каталогов с выбранной таблицей

  2. Щелкните "Создать>синхронизированную таблицу " в представлении сведений о таблице.

    Раскрывающийся список кнопки

  3. В диалоговом окне создания синхронизированной таблицы :

    Списки каталогов и схем включают только схемы каталога Unity, в которых текущий пользователь имеет USE_SCHEMAи CREATE_TABLE привилегии. Если вы не видите схему, которую вы ожидаете, подтвердите разрешения с помощью администратора каталога.

    1. Имя таблицы: введите имя синхронизированной таблицы (она создается в том же каталоге и схеме, что и исходная таблица). При этом создается синхронизированная таблица каталога Unity и таблица Postgres, которые можно запрашивать.

    2. Тип базы данных: выберите Lakebase Serverless (автомасштабирование).

    3. Режим синхронизации: выберите моментальный снимок, триггер или непрерывный в зависимости от ваших потребностей (см. режимы синхронизации выше).

    4. Настройте выбор проекта, ветви и базы данных.

    5. Проверьте правильность первичного ключа (обычно автоматическое обнаружение).

      Это важно

      Столбцы в первичном ключе не могут принимать значение NULL в синхронизированной таблице. Строки со значениями NULL в столбцах первичного ключа исключаются из синхронизации.

    6. (Необязательно) Если две строки в исходной таблице могут иметь один и тот же первичный ключ, выберите ключ временных рядов, чтобы настроить дедупликацию. При указании ключа таймерии синхронизированная таблица содержит только строку с последним значением ключа таймерии для каждого первичного ключа. Режим сбоя без ключа таймерии см. в разделе "Повторяющиеся ключи".

    Если вы выбрали триггерный или непрерывный режим и еще не включили канал изменений данных, вы увидите предупреждение с точной командой для запуска. Вопросы о совместимости типов данных см. в разделе "Типы данных" и "Совместимость".

    Нажмите кнопку "Создать", чтобы создать синхронизированную таблицу.

  4. Отслеживайте синхронизированную таблицу в каталоге. На вкладке "Обзор" отображается состояние синхронизации, конфигурация, состояние конвейера и метка времени последней синхронизации. Теперь используйте синхронизацию для обновления вручную.

пакет SDK Python

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.postgres import (
    SyncedTable,
    SyncedTableSyncedTableSpec,
    SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy,
)

w = WorkspaceClient()

synced_table = w.postgres.create_synced_table(
    synced_table=SyncedTable(spec=SyncedTableSyncedTableSpec(
        source_table_full_name="main.sales.orders",
        branch="projects/my-project/branches/production",
        primary_key_columns=["order_id"],
        scheduling_policy=SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.SNAPSHOT,
        postgres_database="mydb",
        create_database_objects_if_missing=True,
    )),
    synced_table_id="my-catalog.sales.orders",
).wait()

print(f"Synced table created: {synced_table.name}")

Объект synced_table_id использует формат catalog.schema.table и становится именем синхронизированной таблицы в Unity Catalog. В Postgres таблица {table} создается в схеме {schema}, в базе данных, которую вы задаете с помощью postgres_database (здесь, mydb).

пакет SDK для Java

import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.*;
import java.util.List;

WorkspaceClient w = new WorkspaceClient();

SyncedTable syncedTable = w.postgres().createSyncedTable(
    new CreateSyncedTableRequest()
        .setSyncedTableId("my-catalog.sales.orders")
        .setSyncedTable(new SyncedTable()
            .setSpec(new SyncedTableSyncedTableSpec()
                .setSourceTableFullName("main.sales.orders")
                .setBranch("projects/my-project/branches/production")
                .setPrimaryKeyColumns(List.of("order_id"))
                .setSchedulingPolicy(SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy.SNAPSHOT)
                .setPostgresDatabase("mydb")
                .setCreateDatabaseObjectsIfMissing(true))))
    .waitForCompletion();

System.out.println("Synced table created: " + syncedTable.getName());

завиток

curl -X POST "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables?synced_table_id=my-catalog.sales.orders" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
  -H "Content-Type: application/json" \
  -d '{
    "spec": {
      "source_table_full_name": "main.sales.orders",
      "branch": "projects/my-project/branches/production",
      "primary_key_columns": ["order_id"],
      "scheduling_policy": "SNAPSHOT",
      "postgres_database": "mydb",
      "create_database_objects_if_missing": true
    }
  }'

Это возвращает длительную операцию. Проверяйте возвращаемое name поле до done: true. См. долговременные операции. Сведения о настройке проверки подлинности см. в разделе "Проверка подлинности".

Планирование или активация последующих синхронизаций

При создании начальный снимок выполняется автоматически. Для режимов моментального снимка и триггеров последующие синхронизации должны запускаться явным образом. Непрерывный режим самоуправляющийся.

Задача конвейера синхронизации таблиц базы данных

Задача конвейера синхронизации таблиц базы данных в заданиях Lakeflow выполняет конвейер синхронизированной таблицы в качестве шага рабочего процесса. Настройте задание с помощью триггера обновления таблицы или расписания.

Триггер обновлений исходной таблицы

Запускает задание при обновлении исходной таблицы каталога Unity. В триггерном режиме новые изменения применяются инкрементально, обеспечивая свежесть данных почти в реальном времени и без постоянных затрат, присущих непрерывному режиму.

  1. На боковой панели щелкните "Рабочие процессы".
  2. Нажмите кнопку "Создать задание " или откройте существующее задание.
  3. На вкладке "Задачи " нажмите кнопку +Добавить другой тип задачи.
  4. В разделе "Прием и преобразование" выберите конвейер синхронизации таблиц базы данных.
  5. В поле конвейера выберите конвейер, связанный с синхронизированной таблицей.
  6. В разделе "Расписания и триггеры" нажмите кнопку "Добавить триггер".
  7. Выберите "Обновление таблицы " в качестве типа триггера.
  8. В разделе "Таблицы" выберите исходную таблицу каталога Unity для мониторинга.
  9. Нажмите кнопку Сохранить.

Триггер, действующий по расписанию

Выполняет синхронизацию с фиксированной частотой. Хорошо подходит для режима моментального снимка, где ежедневное или еженедельное полное обновление обычно является наиболее эффективным способом.

  1. Выполните действия 1–5 выше, чтобы добавить задачу конвейера синхронизации таблиц базы данных в задание.
  2. В разделе "Расписания и триггеры" нажмите кнопку "Добавить триггер".
  3. Выберите "Запланированный " в качестве типа триггера.
  4. Задайте расписание и часовой пояс cron, а затем нажмите кнопку "Сохранить".

Проверка состояния синхронизации

Чтобы проверить текущее состояние и время последней синхронизации синхронизированной таблицы:

Пользовательский интерфейс

В каталоге перейдите к синхронизированной таблице и перейдите на вкладку "Обзор ". В нем отображается текущее состояние синхронизации, состояние конвейера и метка времени последней синхронизации.

пакет SDK Python

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

table = w.postgres.get_synced_table("synced_tables/my-catalog.sales.orders")
print(f"State: {table.status.detailed_state}")
print(f"Last sync: {table.status.last_sync_time}")
print(f"Message: {table.status.message}")

пакет SDK для Java

import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.service.postgres.SyncedTable;

WorkspaceClient w = new WorkspaceClient();

SyncedTable table = w.postgres().getSyncedTable("synced_tables/my-catalog.sales.orders");
System.out.println("State: " + table.getStatus().getDetailedState());
System.out.println("Last sync: " + table.getStatus().getLastSyncTime());
System.out.println("Message: " + table.getStatus().getMessage());

завиток

curl "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}"

Типы данных и совместимость

Типы данных каталога Unity сопоставляются с типами Postgres при создании синхронизированных таблиц. Сложные типы (ARRAY, MAP, STRUCT) хранятся в формате JSONB в Postgres.

Тип исходного столбца Тип столбца Postgres
БИГИНТ БИГИНТ
BINARY BYTEA
BOOLEAN BOOLEAN
DATE DATE
DECIMAL(p,s) ЧИСЛОВОЙ
ДВОЙНОЙ ДВОЙНАЯ ТОЧНОСТЬ
FLOAT РЕАЛЬНЫЙ
INT ЦЕЛОЕ ЧИСЛО
INTERVAL INTERVAL
СМОЛЛИНТ СМОЛЛИНТ
СТРУНА ТЕКСТ
TIMESTAMP Метка времени с часовым поясом
TIMESTAMP_NTZ МЕТКА ВРЕМЕНИ БЕЗ ЧАСОВОГО ПОЯСА
TINYINT СМОЛЛИНТ
ARRAY<типЭлемента JSONB
MAP<тип_ключа,тип_значения> JSONB
Имя поля STRUCT<:fieldType[, ...]> JSONB

Замечание

Типы GEOGRAPHY, GEOMETRY, VARIANT и OBJECT не поддерживаются.

Обработка недопустимых символов

Некоторые символы, такие как нулевые байты (0x00), разрешены в столбцах каталога Unity STRING, ARRAY, MAP или STRUCT, но не поддерживаются в столбцах Postgres TEXT или JSONB. Это может привести к сбоям синхронизации с такими ошибками:

ERROR: invalid byte sequence for encoding "UTF8": 0x00
ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text
  • Первая ошибка возникает, когда байт null отображается в строковом столбце верхнего уровня, который сопоставляется непосредственно с Postgres TEXT.
  • Вторая ошибка возникает, когда байт null отображается в строке, вложенной внутри сложного типа (STRUCTилиARRAYMAP), который сериализуется как JSONB. Во время сериализации все строки привязываются к Postgres TEXT, где \u0000 запрещено.

Решения:

  • Очистка строковых полей: удалите неподдерживаемые символы перед синхронизацией. Для нулевых байтов (NULL) в столбцах STRING:

    SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table
    
  • Преобразование в BINARY: для столбцов STRING, в которых необходимо сохранить необработанные байты, преобразуйте в тип BINARY.

Планирование емкости

При планировании реализации синхронизированных таблиц рассмотрите следующие требования к ресурсам:

  • Использование подключений: Каждая синхронизированная таблица использует до 16 подключений к базе данных Lakebase, что засчитывается в совокупный лимит подключения экземпляра.
  • Ограничения размера: общий объем логических данных во всех синхронизированных таблицах составляет 8 ТБ. Отдельные таблицы не имеют ограничений, но Databricks рекомендует не превышать 1 ТБ для таблиц, требующих обновлений.
  • Размер полного обновления: при активации полного обновления старая версия в Postgres не удаляется до завершения новой синхронизации. Обе версии временно засчитываются в счет лимита размера логической базы данных во время обновления.
  • Таблиц на источник: для одной исходной таблицы можно синхронизировать до 20 таблиц.
  • Требования к именованию: имена баз данных, схем и таблиц могут содержать только буквенно-цифровые символы и символы подчеркивания ([A-Za-z0-9_]+).
  • Руководство по идентификатору источника. Избегайте использования прописных букв или специальных символов в именах столбцов или таблиц в исходной таблице каталога Unity. Если вы их оставите, то при обращении к ним в Postgres необходимо заключать эти идентификаторы в кавычки.
  • Эволюция схемы. Для триггерных и непрерывных режимов поддерживаются только изменения аддитивной схемы (например, добавление столбцов).
  • Повторяющиеся ключи: если две строки имеют один и тот же первичный ключ в исходной таблице, конвейер синхронизации завершается ошибкой, если не настроить дедупликацию с помощью ключа таймерии.
  • Идемпотентность API: API синхронизированных таблиц являются идемпотентными, поэтому повторите попытку при временных ошибках, чтобы обеспечить своевременные операции.
  • Частота обновления: для автомасштабирования Lakebase конвейер синхронизации поддерживает непрерывную запись и запись по триггеру со скоростью примерно 150 строк в секунду на единицу мощности (CU), а запись снимков — до 2 000 строк в секунду на CU.

Операции, разрешенные для синхронизированных таблиц в Postgres

Azure Databricks рекомендует выполнять только следующие операции в Postgres для синхронизированных таблиц, чтобы предотвратить случайные перезаписи или несоответствия данных:

  • Запросы только для чтения
  • Создание индексов
  • Удаление таблицы (чтобы освободить место после удаления синхронизированной таблицы из каталога Unity)

Хотя можно изменить синхронизированные таблицы в Postgres другими способами, он вмешивается в конвейер синхронизации.

Владение и разрешения

Если вы создаете новую базу данных Postgres, схему или таблицу, свойство Postgres устанавливается следующим образом:

  • Право владения назначается пользователю, создающему базу данных, схему или таблицу, если их имя входа в Azure Databricks существует в качестве ролью в Postgres. Чтобы добавить в Postgres роль удостоверения Azure Databricks, см. раздел Роли Postgres.
  • В противном случае владение назначается владельцу родительского объекта в Postgres (обычно databricks_superuser).

Управление доступом к синхронизированной таблице

После создания синхронизированной таблицы databricks_superuser может читать синхронизированную таблицу из Postgres. Этот databricks_superuserpg_read_all_dataпараметр позволяет считывать эту роль из всех таблиц. Он также имеет привилегию pg_write_all_data , которая позволяет этой роли записывать все таблицы. Это означает, что databricks_superuser можно также записывать в синхронизированную таблицу в Postgres. Lakebase поддерживает это поведение записи в случае, если необходимо внести срочные изменения в целевую таблицу. Однако Azure Databricks рекомендует вносить исправления в исходную таблицу.

  • Кроме того, databricks_superuser эти привилегии можно предоставить другим пользователям:

    GRANT USAGE ON SCHEMA synced_table_schema TO user;
    
    GRANT SELECT ON synced_table_name TO user;
    
  • databricks_superuser может отозвать эти привилегии:

    REVOKE USAGE ON SCHEMA synced_table_schema FROM user;
    
    REVOKE {SELECT | INSERT | UPDATE | DELETE} ON synced_table_name FROM user;
    

Управление синхронизированными операциями таблицы

databricks_superuser может управлять тем, какие пользователи уполномочены выполнять конкретные операции в синхронизированной таблице. Поддерживаемые операции для синхронизированных таблиц:

  • CREATE INDEX
  • ALTER INDEX
  • DROP INDEX
  • DROP TABLE

Все остальные операции DDL запрещены для синхронизированных таблиц.

Чтобы предоставить этим привилегиям дополнительным пользователям, databricks_superuser сначала необходимо создать расширение в databricks_auth:

CREATE EXTENSION IF NOT EXISTS databricks_auth;

databricks_superuser Затем пользователь может добавить пользователя для управления синхронизированной таблицей:

SELECT databricks_synced_table_add_manager('"synced_table_schema"."synced_table"'::regclass, '[user]');

databricks_superuser может удалить пользователя из управления синхронизированной таблицей.

SELECT databricks_synced_table_remove_manager('[table]', '[user]');

databricks_superuser может просматривать всех руководителей:

SELECT * FROM databricks_synced_table_managers;

Удаление синхронизированной таблицы

При удалении синхронизированной таблицы из каталога Unity также удаляется соответствующая таблица Postgres.

Пользовательский интерфейс

В каталоге найдите синхронизированную таблицу, щелкните значок меню Kebab. Меню и нажмите кнопку "Удалить".

пакет SDK Python

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

w.postgres.delete_synced_table("synced_tables/my-catalog.sales.orders").wait()

пакет SDK для Java

import com.databricks.sdk.WorkspaceClient;

WorkspaceClient w = new WorkspaceClient();

w.postgres().deleteSyncedTable("synced_tables/my-catalog.sales.orders").waitForCompletion();

завиток

curl -X DELETE "https://your-workspace.cloud.databricks.com/api/2.0/postgres/synced_tables/my-catalog.sales.orders" \
  -H "Authorization: Bearer ${DATABRICKS_TOKEN}"

Узнать больше

задачи Описание
Создание проекта Настройка проекта Lakebase
Подключение к базе данных Сведения о параметрах подключения для Lakebase
Регистрация базы данных в каталоге Unity Сделать данные Lakebase видимыми в каталоге Unity для унифицированного управления и кросс-исходных запросов
Интеграция каталога Unity Общие сведения об управлении и разрешениях

Интеграция с каталогом

  • Дублирование каталога: Создание синхронизированной таблицы в стандартном каталоге, предназначенной для базы данных Postgres, которая также зарегистрирована в качестве отдельного каталога базы данных, приводит к тому, что синхронизированная таблица будет отображаться в каталоге Unity как в стандартном, так и в каталогах баз данных.

Другие варианты

Сведения о решениях Partner Connect по обратному ETL для синхронизации данных с системами, отличными от Databricks, см. таких, как Census или Hightouch.