Перемещение таблиц между конвейерами

В этой статье описывается перемещение потоковых таблиц и материализованных представлений между конвейерами. После перемещения поток в новый конвейер, именно этот новый конвейер обновляет таблицу, а не исходный. Это полезно во многих сценариях, в том числе:

Требования

Ниже приведены требования к перемещению таблицы между конвейерами.

  • При выполнении ALTER ... команды необходимо использовать Databricks Runtime 16.3 или более поздней версии и Databricks Runtime 17.2 для перемещения таблицы между рабочими областями.

  • Как исходные, так и конечные конвейеры должны находиться в рабочих областях, которые совместно используют хранилище метаданных. Чтобы проверить хранилище метаданных, см. current_metastore функцию.

  • Учетная запись пользователя или учетная запись службы, выполняющая операцию, должна выполняться от имени пользователя как в исходном, так и в целевом конвейере.

  • Целевой конвейер должен использовать режим публикации по умолчанию. Это позволяет публиковать таблицы в нескольких каталогах и схемах.

    Кроме того, оба конвейера должны использовать устаревший режим публикации, и оба должны иметь одно и то же каталог и целевое значение в параметрах. Сведения о устаревшем режиме публикации см. в схеме LIVE (устаревшая версия).

Замечание

Эта функция не поддерживает перемещение конвейера из режима публикации по умолчанию в конвейер с устаревшим режимом публикации.

Переместить таблицу между конвейерами

В следующих инструкциях описывается перемещение потоковой таблицы или материализованного представления из одного конвейера в другой.

  1. Остановите исходный конвейер, если он запущен. Подождите, пока он полностью остановится.

  2. Удалите определение таблицы из кода исходного конвейера и сохраните ее где-то для дальнейшего использования.

    Включите все вспомогательные запросы или код, необходимые для правильного выполнения конвейера.

  3. В записной книжке или редакторе SQL выполните следующую команду SQL, чтобы переназначить таблицу из исходного конвейера в целевой конвейер:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    Обратите внимание, что команда SQL должна выполняться из рабочей области исходного конвейера.

    Команда использует ALTER MATERIALIZED VIEW и ALTER STREAMING TABLE для материализованных представлений, управляемых каталогом Unity, и потоковых таблиц соответственно. Чтобы выполнить то же действие в таблице хранилища метаданных Hive, используйте ALTER TABLE.

    Например, если вы хотите переместить потоковую таблицу с именем sales в конвейер с идентификатором abcd1234-ef56-ab78-cd90-1234efab5678, выполните следующую команду.

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    Замечание

    Должен pipelineId быть допустимым идентификатором конвейера. Значение null не разрешено.

  4. Добавьте определение таблицы в код целевого конвейера.

    Замечание

    Если схема каталога или целевая схема отличаются между источником и назначением, точное копирование запроса может не работать. Частично квалифицированные таблицы в определении могут трактоваться по-разному. При переходе к полной квалификации имен таблиц может потребоваться обновить определение.

    Замечание

    Удалите или закомментируйте любой поток с добавлением один раз (в Python, запросы с append_flow(однократно=True), в SQL, запросы с INSERT INTO ONCE) в коде целевого конвейера. Дополнительные сведения см. в разделе "Ограничения".

Перемещение завершено. Теперь можно запускать как исходные, так и конечные конвейеры. Конечный конвейер обновляет таблицу.

Устранение неполадок

В следующей таблице описываются ошибки, которые могут произойти при перемещении таблицы между конвейерами.

Ошибка Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE Исходный конвейер находится в режиме публикации по умолчанию, а назначение использует режим динамической схемы (устаревшей версии). Это не поддерживается. Если источник использует режим публикации по умолчанию, то и назначение должно его использовать.
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE Поддерживается только перемещение таблиц между конвейерами. Перемещение таблиц потоковой передачи и материализованных представлений, созданных с помощью Databricks SQL, не поддерживается.
DESTINATION_PIPELINE_NOT_FOUND Должен pipelines.pipelineId быть допустимым конвейером. pipelineId не может быть null.
После перемещения таблица не обновляется в конечной точке. Чтобы быстро устранить эту проблему, переместите таблицу обратно в исходный конвейер, выполнив те же инструкции.
PIPELINE_PERMISSION_DENIED_NOT_OWNER Исходные и конечные конвейеры должны принадлежать пользователю, выполняющим операцию перемещения.
TABLE_ALREADY_EXISTS Таблица, указанная в сообщении об ошибке, уже существует. Это может произойти, если резервная таблица для конвейера уже существует. В этом случае таблица DROP указанная в ошибке.

Пример с несколькими таблицами в конвейере

Конвейеры могут содержать более одной таблицы. По-прежнему можно перемещать одну таблицу между конвейерами. В этом сценарии существует три таблицы (table_a, table_b, ), table_cкоторые считываются друг от друга последовательно в исходном конвейере. Мы хотим переместить одну таблицу, table_b, в другой конвейер.

Исходный код конвейера обработки данных:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Мы перемещаем table_b в другой конвейер, копируя и удаляя определение таблицы из исходного источника и обновляя идентификатор конвейера table_b.

Во-первых, приостанавливайте все расписания и дождитесь завершения обновлений в исходных и целевых конвейерах. Затем измените исходный конвейер, чтобы удалить код для перемещаемой таблицы. Обновленный пример исходного конвейера становится следующим:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Перейдите в редактор SQL, чтобы выполнить ALTER pipelineId команду.

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Затем перейдите к целевому конвейеру и добавьте определение table_b. Если каталог и схема по умолчанию совпадают в параметрах конвейера, изменения кода не требуются.

Код целевого конвейера:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

Если каталог и схема по умолчанию в настройках конвейера отличаются, вы должны использовать полностью квалифицированное имя с каталогом и схемой конвейера.

Например, код целевого конвейера может быть следующим:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

Запустите (или повторно включите расписания) для исходных и целевых конвейеров.

Теперь конвейеры разъединены. Запрос table_c читает из table_b (теперь в целевом конвейере), а table_b читает из table_a (в исходном конвейере). При выполнении триггерного выполнения в исходном конвейере table_b не обновляется, так как он больше не управляется исходным конвейером. Исходный конвейер рассматривает table_b как таблицу вне конвейера. Это сравнимо с определением материализованного представления, которое считывается из таблицы Delta в каталоге Unity и не управляется конвейером обработки данных.

Ограничения

Ниже приведены ограничения для перемещения таблиц между конвейерами.

  • Материализованные представления и таблицы потоковой передачи, созданные с помощью Databricks SQL, не поддерживаются.
  • Однократное добавление в потоки — потоки Python append_flow(once=True) и потоки SQL INSERT INTO ONCE — не поддерживаются. Их состояние выполнения не сохраняется, они могут быть запущены снова в целевом трубопроводе. Удалите или закомментируйте однократно добавляемые потоки в целевом канале, чтобы избежать повторного выполнения этих потоков.
  • Частные таблицы или представления не поддерживаются.
  • Исходные и конечные конвейеры должны быть конвейерами. Конвейеры NULL не поддерживаются.
  • Исходные и конечные конвейеры должны находиться в одной рабочей области или в разных рабочих областях, которые совместно используют одно и то же хранилище метаданных.
  • Исходные и назначенные конвейеры должны принадлежать пользователю, выполняющему операцию перемещения.
  • Если исходный конвейер использует режим публикации по умолчанию, конечный конвейер также должен использовать режим публикации по умолчанию. Невозможно переместить таблицу из конвейера с помощью режима публикации по умолчанию в конвейер, использующий схему LIVE (устаревшую). См. LIVE схему (устаревшую версию).
  • Если исходные и конечные конвейеры используют схему LIVE (устаревшую версию), они должны иметь одинаковые catalog значения и target значения в параметрах.