共用方式為


在管線之間移動表格

本文說明如何在管線之間移動串流數據表和具體化檢視。 移動之後,您所移動的流程管線會更新資料表,而不是原始管線。 這在許多案例中很有用,包括:

  • 將大型管線分割成較小的管線。
  • 將多個管線合併成一個較大的管線。
  • 變更管線中某些資料表格的更新頻率。
  • 將數據表從使用舊版發佈模式的管線移至預設發佈模式。 如需舊版發佈模式的詳細資訊,請參閱 管線的舊版發佈模式。 若要查看如何一次移轉整個管線的發佈模式,請參閱 在管線中啟用預設發佈模式
  • 在不同工作區中跨管線移動資料表。

需求

以下是在管線之間移動資料表的要求。

  • 執行 ALTER ... 命令時,您必須使用 Databricks Runtime 16.3 或更新版本,並使用 Databricks Runtime 17.2 進行跨工作區資料表移動。

  • 來源和目的地管線都必須是:

    • 由執行作業的 Azure Databricks 使用者帳戶或服務主體所擁有
    • 在共用中繼存放區的工作區中。 若要檢查中繼資料存放區,請參閱 current_metastore 函數
  • 目的地管線必須使用預設發佈模式。 這可讓您將資料表發佈至多個目錄和架構。

    或者,兩個管線都必須使用舊版發佈模式,而且兩者在設定中必須具有相同的目錄和目標值。 如需舊版發佈模式的相關資訊,請參閱 LIVE 結構描述 (舊版)。

備註

此功能不支援使用預設發佈模式將管線移至使用舊版發佈模式的管線。

在管線之間移動資料表

下列指示說明如何將串流數據表或具體化檢視從一個管線移至另一個管線。

  1. 如果來源管線正在執行,請停止運行。 等候它完全停止。

  2. 從來源管線的程式碼中移除資料表的定義,並將其儲存在某個地方以供將來參考。

    包含管線正確執行所需的任何支援查詢或程序代碼。

  3. 從筆記本或 SQL 編輯器中,執行下列 SQL 命令,將資料表從來源管線重新指派至目的地管線:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    請注意,SQL 命令必須從來源管線的工作區執行。

    此命令使用 ALTER MATERIALIZED VIEWALTER STREAMING TABLE,分別針對 Unity Catalog 管理的具現化檢視和串流資料表。 若要在 Hive 中繼存放區資料表上執行相同的動作,請使用 ALTER TABLE

    例如,如果您想要將名為 sales 的串流資料表移至ID為 abcd1234-ef56-ab78-cd90-1234efab5678 的管線,您應該執行下列命令:

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    備註

    pipelineId必須是有效的管線標識碼。 null不允許此值。

  4. 將資料表的定義新增至目的地管線的程式碼。

    備註

    如果來源與目的地的目錄或目標架構不同,直接複製查詢可能無法正常運作。 定義中的部分限定數據表可以不同方式解析。 您可能需要在移動時更新定義,以完全限定表格名稱。

    備註

    從目標管線的程式碼中移除或註解掉任何追加一次性流(在 Python 中,使用 append_flow(once=True) 的查詢,在 SQL 中,使用 INTO ONCE 的INSERT查詢)。 如需詳細資訊,請參閱 限制

移動已完成。 您現在可以同時運行來源和目的地的管道。 目的地管線會更新資料表。

故障排除

下表描述在管線之間移動數據表時可能發生的錯誤。

錯誤 Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE 來源管線處於預設發佈模式,而目的地會使用 LIVE 架構(舊版)模式。 不支援此功能。 如果來源使用預設發佈模式,則目的地也必須使用。
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE 僅支援在不同的管線之間移動資料表。 不支援移動使用 Databricks SQL 建立的串流資料表和具體化檢視。
DESTINATION_PIPELINE_NOT_FOUND pipelines.pipelineId必須是有效的管線。 pipelineId不能為 Null。
移動之後,數據表無法在目的地中更新。 若要在此案例中快速緩和,請遵循相同的指示,將數據表移回來源管線。
PIPELINE_PERMISSION_DENIED_NOT_OWNER 執行移動作業的用戶必須擁有來源和目的地管線。
TABLE_ALREADY_EXISTS 錯誤訊息中所列的數據表已經存在。 如果管線的備份數據表已經存在,就會發生這種情況。 在此情況下, DROP 錯誤中列出的數據表。

管線中有多個資料表的範例

管線可以包含多個資料表。 您仍然可以在管線之間一次移動一個資料表。 在此案例中,來源流程中有三個資料表(table_atable_btable_c)循序互相讀取。 我們想要將一個資料表table_b 移至另一個管線。

初始管道程式碼

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

我們先將 table_b 移至另一個管線,再從來源複製並移除資料表定義,然後更新 table_b 的 pipelineId。

首先,暫停任何排程,並等候來源和目標管線的更新完成。 然後修改來源管線,以移除要移動之資料表的程式碼。 更新的來源管線範例程式碼會變成:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

移至 SQL 編輯器以執行 ALTER pipelineId 命令。

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

接下來,移至目的地管線,然後新增 table_b 的定義。 如果管線設定中的預設目錄和結構描述相同,則不需要變更程式碼。

目標管線程式碼:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

如果管道設定中的預設目錄和結構描述不同,您必須使用管道的目錄和結構描述新增完整名稱。

例如,目標管線程式碼可以是:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

執行來源和目標管線的排程(或重新啟用)。

管線現在分離。 查詢 table_ctable_b 讀取(現在位於目標管線中),而 table_btable_a 讀取(位於來源管線中)。 當您在來源管線table_b上執行觸發執行時,它不會更新,因為它不再由來源管線管理。 來源管線會 table_b 視為管線外部的數據表。 這相當於定義從 Unity Catalog 中未由管線管理的 Delta 表讀取的實體化檢視。

限制

以下是在管線之間移動數據表的限制。

  • 不支援使用 Databricks SQL 建立的具體化檢視和串流數據表。
  • 不支援只附加一次的流程—Python append_flow(once=True) 和 SQL INSERT INTO ONCE。 不會保留其執行狀態,它們可能會在目標管線中重新執行。 從目的地管線移除或註解 append once 流程,以避免再次執行這些流程。
  • 不支援專用資料表或檢視。
  • 來源和目的地管線必須是管線。 不支援 Null 管線。
  • 來源和目的地管線必須位於相同的工作區中,或位於共用相同中繼存放區的不同工作區中。
  • 執行移動作業的用戶必須擁有來源和目的地管線。
  • 如果來源管線使用預設發佈模式,目的地管線也必須使用預設發佈模式。 您無法使用預設發佈模式將資料表從管線移至使用 LIVE 架構的管線(舊版)。 請參閱 LIVE 架構(舊版)。
  • 如果來源和目的地管線都使用 LIVE 架構(舊版),則它們必須在設定中具有相同 catalogtarget 值。