Teilen über


Verschieben von Tabellen zwischen Pipelines

In diesem Artikel wird beschrieben, wie Sie Streamingtabellen und materialisierte Ansichten zwischen Pipelines verschieben. Nach der Verschiebung aktualisiert die Pipeline, zu der Sie den Flow verschieben, die Tabelle, und nicht die ursprüngliche Pipeline. Dies ist in vielen Szenarien hilfreich, einschließlich:

  • Teilen Sie eine große Pipeline in kleinere.
  • Mehrere Pipelines zu einer einzigen größeren zusammenführen.
  • Ändern Sie die Aktualisierungshäufigkeit für einige Tabellen in einer Pipeline.
  • Verschieben Sie Tabellen aus einer Pipeline, die den Veröffentlichungsmodus im Legacy-Modus verwendet, in den Standardveröffentlichungsmodus. Ausführliche Informationen zum Legacy-Veröffentlichungsmodus finden Sie im Legacy-Veröffentlichungsmodus für Pipelines. Informationen dazu, wie Sie den Veröffentlichungsmodus für eine gesamte Pipeline gleichzeitig migrieren können, finden Sie unter Aktivieren des Standardveröffentlichungsmodus in einer Pipeline.
  • Verschieben von Tabellen über Pipelines in verschiedenen Arbeitsbereichen

Anforderungen

Im Folgenden sind die Anforderungen für das Verschieben einer Tabelle zwischen Pipelines aufgeführt.

  • Sie müssen Databricks Runtime 16.3 oder höher verwenden, wenn Sie den ALTER ... Befehl ausführen, und Databricks Runtime 17.2 für die arbeitsbereichübergreifende Tabellenverschiebung.

  • Sowohl Quell- als auch Zielpipelines müssen folgendes sein:

    • Eigentum des Azure Databricks-Benutzerkontos oder des Dienstprinzipals, der den Vorgang ausführt
    • In Arbeitsbereichen, die einen Metaspeicher gemeinsam nutzen. Informationen zum Überprüfen des Metastores finden Sie unter current_metastore "Funktion".
  • Die Zielpipeline muss den Standardveröffentlichungsmodus verwenden. Auf diese Weise können Sie Tabellen in mehreren Katalogen und Schemas veröffentlichen.

    Alternativ müssen beide Pipelines den Legacy-Veröffentlichungsmodus verwenden, und beide müssen denselben Katalog- und Zielwert in den Einstellungen aufweisen. Informationen zum legacy-Veröffentlichungsmodus finden Sie unter LIVE-Schema (Legacy).

Hinweis

Dieses Feature unterstützt das Verschieben einer Pipeline von einem standardmäßigen Veröffentlichungsmodus zu einem älteren Veröffentlichungsmodus nicht.

Verschieben einer Tabelle zwischen Pipelines

Die folgenden Anweisungen beschreiben, wie Sie eine Streamingtabelle oder materialisierte Ansicht von einer Pipeline in eine andere verschieben.

  1. Halten Sie die Quell-Pipeline an, wenn sie gerade ausgeführt wird. Warten Sie, bis es vollständig zum Stillstand gekommen ist.

  2. Entfernen Sie die Definition der Tabelle aus dem Code der Quellpipeline, und speichern Sie sie an einer beliebigen Stelle für zukünftige Verweise.

    Schließen Sie alle unterstützenden Abfragen oder Code ein, die für die ordnungsgemäße Ausführung der Pipeline erforderlich sind.

  3. Führen Sie in einem Notizbuch oder einem SQL-Editor den folgenden SQL-Befehl aus, um die Tabelle aus der Quellpipeline der Zielpipeline neu zuzuweisen:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    Beachten Sie, dass der SQL-Befehl aus dem Arbeitsbereich der Quellpipeline ausgeführt werden muss.

    Der Befehl verwendet ALTER MATERIALIZED VIEW und ALTER STREAMING TABLE für Einheits-Katalog verwaltete materialisierte visions bzw. Streaming-Tabellen vor. Um dieselbe Aktion für eine Hive-Metastore-Tabelle auszuführen, verwenden Sie ALTER TABLE.

    Wenn Sie beispielsweise eine Streamingtabelle sales mit der ID abcd1234-ef56-ab78-cd90-1234efab5678in eine Pipeline verschieben möchten, führen Sie den folgenden Befehl aus:

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    Hinweis

    Die pipelineId muss ein gültiger Pipeline-Bezeichner sein. Der Wert null ist nicht zulässig.

  4. Fügen Sie die Definition der Tabelle dem Code der Zielpipeline hinzu.

    Hinweis

    Wenn das Katalog- oder Zielschema zwischen der Quelle und dem Ziel unterschiedlich ist, kann das exakte Kopieren der Abfrage möglicherweise nicht funktionieren. Teilweise qualifizierte Tabellen in der Definition können unterschiedlich aufgelöst werden. Möglicherweise müssen Sie die Definition aktualisieren, während Sie die Tabellennamen vollständig qualifizieren.

    Hinweis

    Entfernen oder kommentieren Sie alle Append-Once-Flows (in Python, Abfragen mit append_flow(once=True), in SQL, Abfragen mit INSERT INTO ONCE) aus dem Code der Zielpipeline. Weitere Informationen finden Sie unter "Einschränkungen".

Der Umzug ist abgeschlossen. Sie können jetzt sowohl die Quell- als auch die Zielpipeline ausführen. Die Zielpipeline aktualisiert die Tabelle.

Problembehandlung

In der folgenden Tabelle werden Fehler beschrieben, die beim Verschieben einer Tabelle zwischen Pipelines auftreten können.

Fehler Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE Die Quell-Pipeline befindet sich im standardmäßigen Veröffentlichungsmodus, und das Ziel verwendet den Modus LIVE schema (veraltet). Dieser Vorgang wird nicht unterstützt. Wenn die Quelle den Standardmäßigen Veröffentlichungsmodus verwendet, muss auch das Ziel den Standardmäßigen Veröffentlichungsmodus verwenden.
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE Nur das Verschieben von Tabellen zwischen Pipelines wird unterstützt. Das Verschieben von Streamingtabellen und materialisierten Ansichten, die mit Databricks SQL erstellt wurden, werden nicht unterstützt.
DESTINATION_PIPELINE_NOT_FOUND Die pipelines.pipelineId muss eine gültige Pipeline sein. Der Wert pipelineId darf nicht Null sein.
Die Tabelle kann nach der Verschiebung nicht im Ziel aktualisiert werden. Zur schnellen Behebung dieses Problems verschieben Sie die Tabelle zurück in die Pipeline der Quelle, indem Sie die gleichen Anweisungen befolgen.
PIPELINE_PERMISSION_DENIED_NOT_OWNER Sowohl die Quell- als auch die Zielpipeline müssen dem Benutzer gehören, der den Verschiebungsvorgang ausführt.
TABLE_ALREADY_EXISTS Die in der Fehlermeldung aufgeführte Tabelle ist bereits vorhanden. Dies kann passieren, wenn bereits eine Sicherungstabelle für die Pipeline vorhanden ist. In diesem Fall ist DROP die Tabelle, die im Fehler aufgelistet ist.

Beispiel für mehrere Tabellen in einer Pipeline

Pipelines können mehr als eine Tabelle enthalten. Sie können immer noch eine Tabelle nach der anderen zwischen den Pipelines verschieben. In diesem Szenario gibt es drei Tabellen (table_a, table_b, table_c), die in der Quellpipeline sequenziell voneinander gelesen werden. Wir wollen eine Tabelle verschieben, table_b zu einer anderen Pipeline.

Ursprünglicher Pipeline-Quellcode.

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Wir verschieben table_b in eine andere Pipeline, indem wir die Tabellendefinition aus der Quelle kopieren und anschließend entfernen und die PipelineId von table_b aktualisieren.

Halten Sie zunächst alle Abläufe an, und warten Sie, bis Updates sowohl für die Quell- als auch die Zielpipeline abgeschlossen sind. Ändern Sie dann die Quellpipeline, um Code für die verschobene Tabelle zu entfernen. Der aktualisierte Beispielcode für die Quellpipeline wird:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Wechseln Sie zum SQL-Editor, um den ALTER pipelineId Befehl auszuführen.

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Gehen Sie dann zur Ziel-Pipeline und fügen Sie die Definition von table_b. Wenn der Standardkatalog und das Standardschema in den Pipelineeinstellungen identisch sind, sind keine Codeänderungen erforderlich.

Der Zielpipelinecode:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

Wenn der Standardkatalog und das Standardschema in den Pipeline-Einstellungen abweichen, müssen Sie den voll qualifizierten Namen unter Verwendung des Katalogs und Schemas der Pipeline hinzufügen.

Der Zielpipelinecode kann beispielsweise wie unten dargestellt sein:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

Ausführung (oder erneute Aktivierung von Zeitplänen) sowohl für die Quell- als auch für die Zielpipelines.

Die Pipelines sind jetzt getrennt. Die Abfrage für table_c liest von table_b (jetzt in der Zielpipeline) und table_b liest von table_a (in der Quell-Pipeline). Wenn Sie eine ausgelöste Ausführung in der Quellpipeline durchführen, wird table_b nicht aktualisiert, da es nicht mehr von der Quellpipeline verwaltet wird. Die Quellpipeline behandelt table_b als eine Tabelle außerhalb der Pipeline. Dies ist vergleichbar mit der Definition einer materialisierten Ansicht, die aus einer Delta-Tabelle in Einheits-Katalog gelesen wird, die nicht von der Pipeline verwaltet wird.

Einschränkungen

Im Folgenden sind Einschränkungen für das Verschieben von Tabellen zwischen Pipelines aufgeführt.

  • Materialisierte Ansichten und Streamingtabellen, die mit Databricks SQL erstellt wurden, werden nicht unterstützt.
  • Das einmalige Anhängeflüsse – Python append_flow(once=True) Flüsse und SQL INSERT INTO ONCE Flüsse – werden nicht unterstützt. Ihr Ausführungsstatus wird nicht beibehalten, sie können in der Zielpipeline erneut ausgeführt werden. Entfernen oder kommentieren Sie einmal angehängte Verarbeitungsschritte aus der Zielpipeline, um zu vermeiden, dass diese Schritte erneut ausgeführt werden.
  • Private Tabellen oder Ansichten werden nicht unterstützt.
  • Die Quell- und Zielpipelines müssen Pipelines sein. Null-Pipelines werden nicht unterstützt.
  • Quell- und Zielpipelinen müssen sich entweder im selben Arbeitsbereich oder in verschiedenen Arbeitsbereichen befinden, die denselben Metaspeicher gemeinsam nutzen.
  • Quell- und Zielpipeline müssen dem Benutzer gehören, der den Verschiebungsvorgang ausführt.
  • Wenn die Quellpipeline den Standardveröffentlichungsmodus verwendet, muss die Zielpipeline auch den Standardveröffentlichungsmodus verwenden. Sie können eine Tabelle nicht aus einer Pipeline mithilfe des Standardveröffentlichungsmodus in eine Pipeline verschieben, die das LIVE-Schema (Legacy) verwendet. Weitere Informationen finden Sie unter LIVE-Schema (Legacy).
  • Wenn sowohl die Quell- als auch die Zielpipeline das LIVE-Schema (Legacy) verwenden, müssen sie dieselben catalog Werte in target den Einstellungen aufweisen.