Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье описывается перемещение потоковых таблиц и материализованных представлений между конвейерами. После перемещения поток в новый конвейер, именно этот новый конвейер обновляет таблицу, а не исходный. Это полезно во многих сценариях, в том числе:
- Разделите большой конвейер на меньшие.
- Объединение нескольких потоков в один более крупный.
- Измените частоту обновления для некоторых таблиц в конвейере.
- Перемещение таблиц из конвейера, использующего устаревший режим публикации, в режим публикации по умолчанию. Дополнительные сведения об устаревшем режиме публикации для конвейеров см. в разделе "Устаревший режим публикации для конвейеров". Чтобы узнать, как можно перенести режим публикации для всего конвейера одновременно, см. раздел "Включить режим публикации по умолчанию" в конвейере.
- Перенесите таблицы между конвейерами в разных рабочих пространствax.
Требования
Ниже приведены требования к перемещению таблицы между конвейерами.
При выполнении
ALTER ...команды необходимо использовать Databricks Runtime 16.3 или более поздней версии и Databricks Runtime 17.2 для перемещения таблицы между рабочими областями.Как исходные, так и конечные конвейеры должны находиться в рабочих областях, которые совместно используют хранилище метаданных. Чтобы проверить хранилище метаданных, см.
current_metastoreфункцию.Учетная запись пользователя или учетная запись службы, выполняющая операцию, должна выполняться от имени пользователя как в исходном, так и в целевом конвейере.
Целевой конвейер должен использовать режим публикации по умолчанию. Это позволяет публиковать таблицы в нескольких каталогах и схемах.
Кроме того, оба конвейера должны использовать устаревший режим публикации, и оба должны иметь одно и то же каталог и целевое значение в параметрах. Сведения о устаревшем режиме публикации см. в схеме LIVE (устаревшая версия).
Замечание
Эта функция не поддерживает перемещение конвейера из режима публикации по умолчанию в конвейер с устаревшим режимом публикации.
Переместить таблицу между конвейерами
В следующих инструкциях описывается перемещение потоковой таблицы или материализованного представления из одного конвейера в другой.
Остановите исходный конвейер, если он запущен. Подождите, пока он полностью остановится.
Удалите определение таблицы из кода исходного конвейера и сохраните ее где-то для дальнейшего использования.
Включите все вспомогательные запросы или код, необходимые для правильного выполнения конвейера.
В записной книжке или редакторе SQL выполните следующую команду SQL, чтобы переназначить таблицу из исходного конвейера в целевой конвейер:
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");Обратите внимание, что команда SQL должна выполняться из рабочей области исходного конвейера.
Команда использует
ALTER MATERIALIZED VIEWиALTER STREAMING TABLEдля материализованных представлений, управляемых каталогом Unity, и потоковых таблиц соответственно. Чтобы выполнить то же действие в таблице хранилища метаданных Hive, используйтеALTER TABLE.Например, если вы хотите переместить потоковую таблицу с именем
salesв конвейер с идентификаторомabcd1234-ef56-ab78-cd90-1234efab5678, выполните следующую команду.ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");Замечание
Должен
pipelineIdбыть допустимым идентификатором конвейера. Значениеnullне разрешено.Добавьте определение таблицы в код целевого конвейера.
Замечание
Если схема каталога или целевая схема отличаются между источником и назначением, точное копирование запроса может не работать. Частично квалифицированные таблицы в определении могут трактоваться по-разному. При переходе к полной квалификации имен таблиц может потребоваться обновить определение.
Замечание
Удалите или закомментируйте любой поток с добавлением один раз (в Python, запросы с append_flow(однократно=True), в SQL, запросы с INSERT INTO ONCE) в коде целевого конвейера. Дополнительные сведения см. в разделе "Ограничения".
Перемещение завершено. Теперь можно запускать как исходные, так и конечные конвейеры. Конечный конвейер обновляет таблицу.
Устранение неполадок
В следующей таблице описываются ошибки, которые могут произойти при перемещении таблицы между конвейерами.
| Ошибка | Description |
|---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
Исходный конвейер находится в режиме публикации по умолчанию, а назначение использует режим динамической схемы (устаревшей версии). Это не поддерживается. Если источник использует режим публикации по умолчанию, то и назначение должно его использовать. |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
Поддерживается только перемещение таблиц между конвейерами. Перемещение таблиц потоковой передачи и материализованных представлений, созданных с помощью Databricks SQL, не поддерживается. |
DESTINATION_PIPELINE_NOT_FOUND |
Должен pipelines.pipelineId быть допустимым конвейером.
pipelineId не может быть null. |
| После перемещения таблица не обновляется в конечной точке. | Чтобы быстро устранить эту проблему, переместите таблицу обратно в исходный конвейер, выполнив те же инструкции. |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
Исходные и конечные конвейеры должны принадлежать пользователю, выполняющим операцию перемещения. |
TABLE_ALREADY_EXISTS |
Таблица, указанная в сообщении об ошибке, уже существует. Это может произойти, если резервная таблица для конвейера уже существует. В этом случае таблица DROP указанная в ошибке. |
Пример с несколькими таблицами в конвейере
Конвейеры могут содержать более одной таблицы. По-прежнему можно перемещать одну таблицу между конвейерами. В этом сценарии существует три таблицы (table_a, table_b, ), table_cкоторые считываются друг от друга последовательно в исходном конвейере. Мы хотим переместить одну таблицу, table_b, в другой конвейер.
Исходный код конвейера обработки данных:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Мы перемещаем table_b в другой конвейер, копируя и удаляя определение таблицы из исходного источника и обновляя идентификатор конвейера table_b.
Во-первых, приостанавливайте все расписания и дождитесь завершения обновлений в исходных и целевых конвейерах. Затем измените исходный конвейер, чтобы удалить код для перемещаемой таблицы. Обновленный пример исходного конвейера становится следующим:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Перейдите в редактор SQL, чтобы выполнить ALTER pipelineId команду.
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
Затем перейдите к целевому конвейеру и добавьте определение table_b. Если каталог и схема по умолчанию совпадают в параметрах конвейера, изменения кода не требуются.
Код целевого конвейера:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
Если каталог и схема по умолчанию в настройках конвейера отличаются, вы должны использовать полностью квалифицированное имя с каталогом и схемой конвейера.
Например, код целевого конвейера может быть следующим:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
Запустите (или повторно включите расписания) для исходных и целевых конвейеров.
Теперь конвейеры разъединены. Запрос table_c читает из table_b (теперь в целевом конвейере), а table_b читает из table_a (в исходном конвейере). При выполнении триггерного выполнения в исходном конвейере table_b не обновляется, так как он больше не управляется исходным конвейером. Исходный конвейер рассматривает table_b как таблицу вне конвейера. Это сравнимо с определением материализованного представления, которое считывается из таблицы Delta в каталоге Unity и не управляется конвейером обработки данных.
Ограничения
Ниже приведены ограничения для перемещения таблиц между конвейерами.
- Материализованные представления и таблицы потоковой передачи, созданные с помощью Databricks SQL, не поддерживаются.
- Однократное добавление в потоки — потоки Python append_flow(once=True) и потоки SQL INSERT INTO ONCE — не поддерживаются. Их состояние выполнения не сохраняется, они могут быть запущены снова в целевом трубопроводе. Удалите или закомментируйте однократно добавляемые потоки в целевом канале, чтобы избежать повторного выполнения этих потоков.
- Частные таблицы или представления не поддерживаются.
- Исходные и конечные конвейеры должны быть конвейерами. Конвейеры NULL не поддерживаются.
- Исходные и конечные конвейеры должны находиться в одной рабочей области или в разных рабочих областях, которые совместно используют одно и то же хранилище метаданных.
- Исходные и назначенные конвейеры должны принадлежать пользователю, выполняющему операцию перемещения.
- Если исходный конвейер использует режим публикации по умолчанию, конечный конвейер также должен использовать режим публикации по умолчанию. Невозможно переместить таблицу из конвейера с помощью режима публикации по умолчанию в конвейер, использующий схему LIVE (устаревшую). См. LIVE схему (устаревшую версию).
- Если исходные и конечные конвейеры используют схему LIVE (устаревшую версию), они должны иметь одинаковые
catalogзначения иtargetзначения в параметрах.