Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Это важно
Автомасштабирование Lakebase находится в бета-версии в следующих регионах: eastus2westeuropewestus.
Автомасштабирование Lakebase — это последняя версия Lakebase с автомасштабированием вычислений, масштабированием до нуля, ветвлением и мгновенным восстановлением. Сравнение функций с Lakebase Provisioned см. в разделе выбора между версиями.
Обратный ETL в Lakebase синхронизирует таблицы Unity Catalog с Postgres, что позволяет приложениям напрямую использовать отобранные данные из сховища данных. Lakehouse оптимизирован для аналитики и обогащения, а Lakebase предназначен для операционных рабочих нагрузок, требующих быстрых запросов и согласованности транзакций.
Что такое обратный ETL?
Обратный ETL позволяет перемещать данные аналитики из каталога Unity в Lakebase Postgres, где можно сделать доступными для приложений, которым требуются запросы с низкой задержкой (под 10 мс) и полные транзакции ACID. Это помогает преодолеть разрыв между аналитическим хранилищем и операционными системами, сохраняя подготовленные данные в приложениях в режиме реального времени.
Принцип работы
Синхронизированные таблицы Databricks создают управляемую копию данных каталога Unity в Lakebase. При создании синхронизированной таблицы вы получите следующее:
- Новая таблица каталога Unity (только для чтения, управляемая конвейером синхронизации)
- Таблица Postgres в Lakebase (запрашиваемая приложениями)
Например, можно синхронизировать золотые таблицы, встроенные функции или выходные данные машинного обучения в analytics.gold.user_profiles новую синхронизированную таблицу analytics.gold.user_profiles_synced. В Postgres имя схемы каталога Unity становится именем схемы Postgres, поэтому это выглядит следующим образом "gold"."user_profiles_synced":
SELECT * FROM "gold"."user_profiles_synced" WHERE "user_id" = 12345;
Приложения подключаются к стандартным драйверам Postgres и запрашивают синхронизированные данные вместе с собственным рабочим состоянием.
Конвейеры синхронизации используют управляемые Lakeflow Spark Declarative Pipelines для непрерывного обновления как синхронизированной таблицы Unity Catalog, так и таблицы Postgres с изменениями из исходной таблицы. Каждая синхронизация может использовать до 16 подключений к базе данных Lakebase.
Lakebase Postgres поддерживает до 1000 одновременных подключений с гарантиями транзакций, чтобы приложения могли считывать обогащенные данные, а также обрабатывать вставки, обновления и удаления в той же базе данных.
Режимы синхронизации
Выберите правильный режим синхронизации в зависимости от потребностей приложения:
| Mode | Description | Лучше всего подходит для | Performance |
|---|---|---|---|
| Моментальный снимок | Однократная копия всех данных | Начальная настройка или исторический анализ | 10x более эффективно при изменении >10% исходных данных |
| Запущено | Запланированные обновления, которые выполняются по требованию или через интервалы | Панели мониторинга, обновляемые почасовой или ежедневно | Хороший баланс затрат и задержки. Дорого, если запускать с интервалами в <5 минут |
| Непрерывный | Потоковая передача в режиме реального времени с задержкой в секундах | Динамические приложения (более высокая стоимость из-за выделенных вычислений) | Низкая задержка, самая высокая стоимость. Минимальные 15-секундные интервалы |
Триггерный и непрерывный режимы требуют включения канала данных изменений (CDF) в вашей исходной таблице. Если CDF не включен, вы увидите предупреждение в пользовательском интерфейсе с точной ALTER TABLE командой для выполнения. Дополнительные подробности о канале данных об изменениях см. в разделе "Использование канала данных об изменениях Delta Lake на Databricks".
Примеры вариантов использования
Обратный ETL с Lakebase поддерживает распространенные операционные сценарии:
- Подсистемы персонализации, которые нуждаются в новых профилях пользователей, синхронизированных с Databricks Apps
- Приложения, обслуживающие прогнозы модели или значения признаков, вычисляемые в lakehouse
- Панели мониторинга с поддержкой клиентов, отображающие ключевые показатели эффективности в режиме реального времени
- Службы обнаружения мошенничества, которые нуждаются в оценках рисков, доступных для немедленного действия
- Инструменты поддержки, которые обогащают данные клиентов с проверенными данными из Lakehouse
Создание синхронизированной таблицы (пользовательский интерфейс)
Синхронизированные таблицы можно создавать в пользовательском интерфейсе Databricks или программно с помощью пакета SDK. Рабочий процесс пользовательского интерфейса описан ниже.
Предпосылки
Тебе нужно:
- Рабочая область Databricks с активированной функцией Lakebase.
- Проект Lakebase (см. раздел "Создание проекта").
- Таблица каталога Unity с проверенными данными.
- Разрешения на создание синхронизированных таблиц.
Сведения о планировании емкости и совместимости типов данных см. в разделе "Типы данных" и "Планирование емкости".
Шаг 1. Выбор исходной таблицы
Перейдите в каталог на боковой панели рабочей области и выберите таблицу каталога Unity, которую вы хотите синхронизировать.
Шаг 2. Включите поток данных об изменениях (при необходимости)
Если вы планируете использовать режимы активации или непрерывной синхронизации, в исходной таблице должен быть включён поток изменений данных. Проверьте, включена ли CDF в вашей таблице, или выполните следующую команду в редакторе SQL или записной книжке:
ALTER TABLE your_catalog.your_schema.your_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Замените your_catalog.your_schema.your_table фактическим именем таблицы.
Шаг 3. Создание синхронизированной таблицы
Щелкните "Создать>синхронизированную таблицу " в представлении сведений о таблице.
Шаг 4. Настройка
В диалоговом окне создания синхронизированной таблицы :
- Имя таблицы: введите имя синхронизированной таблицы (она создается в том же каталоге и схеме, что и исходная таблица). При этом создается синхронизированная таблица каталога Unity и таблица Postgres, которые можно запрашивать.
- Тип базы данных: выберите Lakebase Serverless (автомасштабирование).
- Режим синхронизации: выберите моментальный снимок, триггер или непрерывный в зависимости от ваших потребностей (см. режимы синхронизации выше).
- Настройте выбор проекта, ветви и базы данных.
- Проверьте правильность первичного ключа (обычно автоматическое обнаружение).
Если вы выбрали триггерный или непрерывный режим и еще не включили канал изменений данных, вы увидите предупреждение с точной командой для запуска. Вопросы о совместимости типов данных см. в разделе "Типы данных" и "Совместимость".
Нажмите кнопку "Создать", чтобы создать синхронизированную таблицу.
Шаг 5. Мониторинг
После создания отслеживайте синхронизированную таблицу в каталоге. На вкладке "Обзор" отображается состояние синхронизации, конфигурация, состояние конвейера и метка времени последней синхронизации. Теперь используйте синхронизацию для обновления вручную.
Типы данных и совместимость
Типы данных каталога Unity сопоставляются с типами Postgres при создании синхронизированных таблиц. Сложные типы (ARRAY, MAP, STRUCT) хранятся в формате JSONB в Postgres.
| Тип исходного столбца | Тип столбца Postgres |
|---|---|
| BIGINT | BIGINT |
| BINARY | BYTEA |
| BOOLEAN | BOOLEAN |
| DATE | DATE |
| DECIMAL(p,s) | ЧИСЛОВОЙ |
| ДВОЙНОЙ | ДВОЙНАЯ ТОЧНОСТЬ |
| FLOAT | РЕАЛЬНЫЙ |
| INT | ЦЕЛОЕ ЧИСЛО |
| INTERVAL | INTERVAL |
| СМОЛЛИНТ | СМОЛЛИНТ |
| СТРУНА | ТЕКСТ |
| TIMESTAMP | Метка времени с часовым поясом |
| TIMESTAMP_NTZ | МЕТКА ВРЕМЕНИ БЕЗ ЧАСОВОГО ПОЯСА |
| TINYINT | СМОЛЛИНТ |
| ARRAY<типЭлемента |
JSONB |
| MAP<тип_ключа,тип_значения> | JSONB |
| Имя поля STRUCT<:fieldType[, ...]> | JSONB |
Замечание
Типы GEOGRAPHY, GEOMETRY, VARIANT и OBJECT не поддерживаются.
Обработка недопустимых символов
Некоторые символы, такие как нулевые байты (0x00), разрешены в столбцах каталога Unity STRING, ARRAY, MAP или STRUCT, но не поддерживаются в столбцах Postgres TEXT или JSONB. Это может привести к сбоям синхронизации с такими ошибками:
ERROR: invalid byte sequence for encoding "UTF8": 0x00
ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text
Решения:
Очистка строковых полей: удалите неподдерживаемые символы перед синхронизацией. Для нулевых байтов (NULL) в столбцах STRING:
SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_tableПреобразование в BINARY: для столбцов STRING, в которых необходимо сохранить необработанные байты, преобразуйте в тип BINARY.
Программное создание
Для рабочих процессов автоматизации можно создавать синхронизированные таблицы программными средствами с помощью пакета SDK Databricks, CLI или REST API.
Пакет SDK для Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
SyncedDatabaseTable,
SyncedTableSpec,
NewPipelineSpec,
SyncedTableSchedulingPolicy
)
# Initialize the Workspace client
w = WorkspaceClient()
# Create a synced table
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
name="lakebase_catalog.schema.synced_table", # Full three-part name
spec=SyncedTableSpec(
source_table_full_name="analytics.gold.user_profiles",
primary_key_columns=["user_id"], # Primary key columns
scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED, # SNAPSHOT, TRIGGERED, or CONTINUOUS
new_pipeline_spec=NewPipelineSpec(
storage_catalog="lakebase_catalog",
storage_schema="staging"
)
),
)
)
print(f"Created synced table: {synced_table.name}")
# Check the status of a synced table
status = w.database.get_synced_database_table(name=synced_table.name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")
интерфейс командной строки (CLI)
# Create a synced table
databricks database create-synced-database-table \
--json '{
"name": "lakebase_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "analytics.gold.user_profiles",
"primary_key_columns": ["user_id"],
"scheduling_policy": "TRIGGERED",
"new_pipeline_spec": {
"storage_catalog": "lakebase_catalog",
"storage_schema": "staging"
}
}
}'
# Check the status of a synced table
databricks database get-synced-database-table "lakebase_catalog.schema.synced_table"
REST API
export WORKSPACE_URL="https://your-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="your-token"
# Create a synced table
curl -X POST "$WORKSPACE_URL/api/2.0/database/synced_tables" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DATABRICKS_TOKEN" \
--data '{
"name": "lakebase_catalog.schema.synced_table",
"spec": {
"source_table_full_name": "analytics.gold.user_profiles",
"primary_key_columns": ["user_id"],
"scheduling_policy": "TRIGGERED",
"new_pipeline_spec": {
"storage_catalog": "lakebase_catalog",
"storage_schema": "staging"
}
}
}'
# Check the status
curl -X GET "$WORKSPACE_URL/api/2.0/database/synced_tables/lakebase_catalog.schema.synced_table" \
-H "Authorization: Bearer $DATABRICKS_TOKEN"
Планирование емкости
При планировании обратной реализации ETL рассмотрите следующие требования к ресурсам:
- Использование подключений: Каждая синхронизированная таблица использует до 16 подключений к базе данных Lakebase, что засчитывается в совокупный лимит подключения экземпляра.
- Ограничения размера: общий объем логических данных во всех синхронизированных таблицах составляет 8 ТБ. Отдельные таблицы не имеют ограничений, но Databricks рекомендует не превышать 1 ТБ для таблиц, требующих обновлений.
-
Требования к именованию: имена баз данных, схем и таблиц могут содержать только буквенно-цифровые символы и символы подчеркивания (
[A-Za-z0-9_]+). - Эволюция схемы. Для триггерных и непрерывных режимов поддерживаются только изменения аддитивной схемы (например, добавление столбцов).
- Скорость обновления:: для автомасштабирования в Lakebase конвейер синхронизации поддерживает непрерывную и триггерную запись с примерно 150 строками в секунду на единицу емкости (CU) и запись снимков до 2000 строк в секунду на CU.
Удаление синхронизированной таблицы
Чтобы удалить синхронизированную таблицу, необходимо удалить ее из каталога Unity и Postgres:
Удалите из каталога Unity: в каталоге найдите синхронизированную таблицу, щелкните
и выберите «Удалить». Это останавливает обновление данных, но оставляет таблицу в Postgres.
Удалите данные из Postgres: подключитесь к базе данных Lakebase и удалите таблицу, чтобы освободить место:
DROP TABLE your_database.your_schema.your_table;
Для подключения к Postgres можно использовать редактор SQL или внешние средства.
Подробнее
| Задача | Description |
|---|---|
| Создание проекта | Настройка проекта Lakebase |
| Подключение к базе данных | Сведения о параметрах подключения для Lakebase |
| Регистрация базы данных в каталоге Unity | Сделать данные Lakebase видимыми в каталоге Unity для унифицированного управления и кросс-исходных запросов |
| Интеграция каталога Unity | Общие сведения об управлении и разрешениях |
Другие варианты
Сведения о решениях Partner Connect по обратному ETL для синхронизации данных с системами, отличными от Databricks, см. таких, как Census или Hightouch.