Поделиться через


Обратный ETL с проектами Lakebase

Это важно

Автомасштабирование Lakebase находится в бета-версии в следующих регионах: eastus2westeuropewestus.

Автомасштабирование Lakebase — это последняя версия Lakebase с автомасштабированием вычислений, масштабированием до нуля, ветвлением и мгновенным восстановлением. Сравнение функций с Lakebase Provisioned см. в разделе выбора между версиями.

Обратный ETL в Lakebase синхронизирует таблицы Unity Catalog с Postgres, что позволяет приложениям напрямую использовать отобранные данные из сховища данных. Lakehouse оптимизирован для аналитики и обогащения, а Lakebase предназначен для операционных рабочих нагрузок, требующих быстрых запросов и согласованности транзакций.

Схема архитектуры, показывающая поток данных из Lakehouse в Lakebase в приложения

Что такое обратный ETL?

Обратный ETL позволяет перемещать данные аналитики из каталога Unity в Lakebase Postgres, где можно сделать доступными для приложений, которым требуются запросы с низкой задержкой (под 10 мс) и полные транзакции ACID. Это помогает преодолеть разрыв между аналитическим хранилищем и операционными системами, сохраняя подготовленные данные в приложениях в режиме реального времени.

Принцип работы

Синхронизированные таблицы Databricks создают управляемую копию данных каталога Unity в Lakebase. При создании синхронизированной таблицы вы получите следующее:

  1. Новая таблица каталога Unity (только для чтения, управляемая конвейером синхронизации)
  2. Таблица Postgres в Lakebase (запрашиваемая приложениями)

Схема, показывающая взаимосвязь трех таблиц в обратном процессе ETL

Например, можно синхронизировать золотые таблицы, встроенные функции или выходные данные машинного обучения в analytics.gold.user_profiles новую синхронизированную таблицу analytics.gold.user_profiles_synced. В Postgres имя схемы каталога Unity становится именем схемы Postgres, поэтому это выглядит следующим образом "gold"."user_profiles_synced":

SELECT * FROM "gold"."user_profiles_synced" WHERE "user_id" = 12345;

Приложения подключаются к стандартным драйверам Postgres и запрашивают синхронизированные данные вместе с собственным рабочим состоянием.

Конвейеры синхронизации используют управляемые Lakeflow Spark Declarative Pipelines для непрерывного обновления как синхронизированной таблицы Unity Catalog, так и таблицы Postgres с изменениями из исходной таблицы. Каждая синхронизация может использовать до 16 подключений к базе данных Lakebase.

Lakebase Postgres поддерживает до 1000 одновременных подключений с гарантиями транзакций, чтобы приложения могли считывать обогащенные данные, а также обрабатывать вставки, обновления и удаления в той же базе данных.

Режимы синхронизации

Выберите правильный режим синхронизации в зависимости от потребностей приложения:

Mode Description Лучше всего подходит для Performance
Моментальный снимок Однократная копия всех данных Начальная настройка или исторический анализ 10x более эффективно при изменении >10% исходных данных
Запущено Запланированные обновления, которые выполняются по требованию или через интервалы Панели мониторинга, обновляемые почасовой или ежедневно Хороший баланс затрат и задержки. Дорого, если запускать с интервалами в <5 минут
Непрерывный Потоковая передача в режиме реального времени с задержкой в секундах Динамические приложения (более высокая стоимость из-за выделенных вычислений) Низкая задержка, самая высокая стоимость. Минимальные 15-секундные интервалы

Триггерный и непрерывный режимы требуют включения канала данных изменений (CDF) в вашей исходной таблице. Если CDF не включен, вы увидите предупреждение в пользовательском интерфейсе с точной ALTER TABLE командой для выполнения. Дополнительные подробности о канале данных об изменениях см. в разделе "Использование канала данных об изменениях Delta Lake на Databricks".

Примеры вариантов использования

Обратный ETL с Lakebase поддерживает распространенные операционные сценарии:

  • Подсистемы персонализации, которые нуждаются в новых профилях пользователей, синхронизированных с Databricks Apps
  • Приложения, обслуживающие прогнозы модели или значения признаков, вычисляемые в lakehouse
  • Панели мониторинга с поддержкой клиентов, отображающие ключевые показатели эффективности в режиме реального времени
  • Службы обнаружения мошенничества, которые нуждаются в оценках рисков, доступных для немедленного действия
  • Инструменты поддержки, которые обогащают данные клиентов с проверенными данными из Lakehouse

Создание синхронизированной таблицы (пользовательский интерфейс)

Синхронизированные таблицы можно создавать в пользовательском интерфейсе Databricks или программно с помощью пакета SDK. Рабочий процесс пользовательского интерфейса описан ниже.

Предпосылки

Тебе нужно:

  • Рабочая область Databricks с активированной функцией Lakebase.
  • Проект Lakebase (см. раздел "Создание проекта").
  • Таблица каталога Unity с проверенными данными.
  • Разрешения на создание синхронизированных таблиц.

Сведения о планировании емкости и совместимости типов данных см. в разделе "Типы данных" и "Планирование емкости".

Шаг 1. Выбор исходной таблицы

Перейдите в каталог на боковой панели рабочей области и выберите таблицу каталога Unity, которую вы хотите синхронизировать.

Обозреватель каталогов с выбранной таблицей

Шаг 2. Включите поток данных об изменениях (при необходимости)

Если вы планируете использовать режимы активации или непрерывной синхронизации, в исходной таблице должен быть включён поток изменений данных. Проверьте, включена ли CDF в вашей таблице, или выполните следующую команду в редакторе SQL или записной книжке:

ALTER TABLE your_catalog.your_schema.your_table
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Замените your_catalog.your_schema.your_table фактическим именем таблицы.

Шаг 3. Создание синхронизированной таблицы

Щелкните "Создать>синхронизированную таблицу " в представлении сведений о таблице.

Раскрывающийся список кнопки

Шаг 4. Настройка

В диалоговом окне создания синхронизированной таблицы :

  1. Имя таблицы: введите имя синхронизированной таблицы (она создается в том же каталоге и схеме, что и исходная таблица). При этом создается синхронизированная таблица каталога Unity и таблица Postgres, которые можно запрашивать.
  2. Тип базы данных: выберите Lakebase Serverless (автомасштабирование).
  3. Режим синхронизации: выберите моментальный снимок, триггер или непрерывный в зависимости от ваших потребностей (см. режимы синхронизации выше).
  4. Настройте выбор проекта, ветви и базы данных.
  5. Проверьте правильность первичного ключа (обычно автоматическое обнаружение).

Если вы выбрали триггерный или непрерывный режим и еще не включили канал изменений данных, вы увидите предупреждение с точной командой для запуска. Вопросы о совместимости типов данных см. в разделе "Типы данных" и "Совместимость".

Нажмите кнопку "Создать", чтобы создать синхронизированную таблицу.

Шаг 5. Мониторинг

После создания отслеживайте синхронизированную таблицу в каталоге. На вкладке "Обзор" отображается состояние синхронизации, конфигурация, состояние конвейера и метка времени последней синхронизации. Теперь используйте синхронизацию для обновления вручную.

Типы данных и совместимость

Типы данных каталога Unity сопоставляются с типами Postgres при создании синхронизированных таблиц. Сложные типы (ARRAY, MAP, STRUCT) хранятся в формате JSONB в Postgres.

Тип исходного столбца Тип столбца Postgres
BIGINT BIGINT
BINARY BYTEA
BOOLEAN BOOLEAN
DATE DATE
DECIMAL(p,s) ЧИСЛОВОЙ
ДВОЙНОЙ ДВОЙНАЯ ТОЧНОСТЬ
FLOAT РЕАЛЬНЫЙ
INT ЦЕЛОЕ ЧИСЛО
INTERVAL INTERVAL
СМОЛЛИНТ СМОЛЛИНТ
СТРУНА ТЕКСТ
TIMESTAMP Метка времени с часовым поясом
TIMESTAMP_NTZ МЕТКА ВРЕМЕНИ БЕЗ ЧАСОВОГО ПОЯСА
TINYINT СМОЛЛИНТ
ARRAY<типЭлемента JSONB
MAP<тип_ключа,тип_значения> JSONB
Имя поля STRUCT<:fieldType[, ...]> JSONB

Замечание

Типы GEOGRAPHY, GEOMETRY, VARIANT и OBJECT не поддерживаются.

Обработка недопустимых символов

Некоторые символы, такие как нулевые байты (0x00), разрешены в столбцах каталога Unity STRING, ARRAY, MAP или STRUCT, но не поддерживаются в столбцах Postgres TEXT или JSONB. Это может привести к сбоям синхронизации с такими ошибками:

ERROR: invalid byte sequence for encoding "UTF8": 0x00
ERROR: unsupported Unicode escape sequence DETAIL: \u0000 cannot be converted to text

Решения:

  • Очистка строковых полей: удалите неподдерживаемые символы перед синхронизацией. Для нулевых байтов (NULL) в столбцах STRING:

    SELECT REPLACE(column_name, CAST(CHAR(0) AS STRING), '') AS cleaned_column FROM your_table
    
  • Преобразование в BINARY: для столбцов STRING, в которых необходимо сохранить необработанные байты, преобразуйте в тип BINARY.

Программное создание

Для рабочих процессов автоматизации можно создавать синхронизированные таблицы программными средствами с помощью пакета SDK Databricks, CLI или REST API.

Пакет SDK для Python

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
    SyncedDatabaseTable,
    SyncedTableSpec,
    NewPipelineSpec,
    SyncedTableSchedulingPolicy
)

# Initialize the Workspace client
w = WorkspaceClient()

# Create a synced table
synced_table = w.database.create_synced_database_table(
    SyncedDatabaseTable(
        name="lakebase_catalog.schema.synced_table",  # Full three-part name
        spec=SyncedTableSpec(
            source_table_full_name="analytics.gold.user_profiles",
            primary_key_columns=["user_id"],  # Primary key columns
            scheduling_policy=SyncedTableSchedulingPolicy.TRIGGERED,  # SNAPSHOT, TRIGGERED, or CONTINUOUS
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog="lakebase_catalog",
                storage_schema="staging"
            )
        ),
    )
)
print(f"Created synced table: {synced_table.name}")

# Check the status of a synced table
status = w.database.get_synced_database_table(name=synced_table.name)
print(f"Synced table status: {status.data_synchronization_status.detailed_state}")
print(f"Status message: {status.data_synchronization_status.message}")

интерфейс командной строки (CLI)

# Create a synced table
databricks database create-synced-database-table \
  --json '{
    "name": "lakebase_catalog.schema.synced_table",
    "spec": {
      "source_table_full_name": "analytics.gold.user_profiles",
      "primary_key_columns": ["user_id"],
      "scheduling_policy": "TRIGGERED",
      "new_pipeline_spec": {
        "storage_catalog": "lakebase_catalog",
        "storage_schema": "staging"
      }
    }
  }'

# Check the status of a synced table
databricks database get-synced-database-table "lakebase_catalog.schema.synced_table"

REST API

export WORKSPACE_URL="https://your-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="your-token"

# Create a synced table
curl -X POST "$WORKSPACE_URL/api/2.0/database/synced_tables" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DATABRICKS_TOKEN" \
  --data '{
    "name": "lakebase_catalog.schema.synced_table",
    "spec": {
      "source_table_full_name": "analytics.gold.user_profiles",
      "primary_key_columns": ["user_id"],
      "scheduling_policy": "TRIGGERED",
      "new_pipeline_spec": {
        "storage_catalog": "lakebase_catalog",
        "storage_schema": "staging"
      }
    }
  }'

# Check the status
curl -X GET "$WORKSPACE_URL/api/2.0/database/synced_tables/lakebase_catalog.schema.synced_table" \
  -H "Authorization: Bearer $DATABRICKS_TOKEN"

Планирование емкости

При планировании обратной реализации ETL рассмотрите следующие требования к ресурсам:

  • Использование подключений: Каждая синхронизированная таблица использует до 16 подключений к базе данных Lakebase, что засчитывается в совокупный лимит подключения экземпляра.
  • Ограничения размера: общий объем логических данных во всех синхронизированных таблицах составляет 8 ТБ. Отдельные таблицы не имеют ограничений, но Databricks рекомендует не превышать 1 ТБ для таблиц, требующих обновлений.
  • Требования к именованию: имена баз данных, схем и таблиц могут содержать только буквенно-цифровые символы и символы подчеркивания ([A-Za-z0-9_]+).
  • Эволюция схемы. Для триггерных и непрерывных режимов поддерживаются только изменения аддитивной схемы (например, добавление столбцов).
  • Скорость обновления:: для автомасштабирования в Lakebase конвейер синхронизации поддерживает непрерывную и триггерную запись с примерно 150 строками в секунду на единицу емкости (CU) и запись снимков до 2000 строк в секунду на CU.

Удаление синхронизированной таблицы

Чтобы удалить синхронизированную таблицу, необходимо удалить ее из каталога Unity и Postgres:

  1. Удалите из каталога Unity: в каталоге найдите синхронизированную таблицу, щелкните значок меню Kebab и выберите «Удалить». Это останавливает обновление данных, но оставляет таблицу в Postgres.

  2. Удалите данные из Postgres: подключитесь к базе данных Lakebase и удалите таблицу, чтобы освободить место:

    DROP TABLE your_database.your_schema.your_table;
    

Для подключения к Postgres можно использовать редактор SQL или внешние средства.

Подробнее

Задача Description
Создание проекта Настройка проекта Lakebase
Подключение к базе данных Сведения о параметрах подключения для Lakebase
Регистрация базы данных в каталоге Unity Сделать данные Lakebase видимыми в каталоге Unity для унифицированного управления и кросс-исходных запросов
Интеграция каталога Unity Общие сведения об управлении и разрешениях

Другие варианты

Сведения о решениях Partner Connect по обратному ETL для синхронизации данных с системами, отличными от Databricks, см. таких, как Census или Hightouch.