Dela via


Flytta tabeller mellan pipelines

Artikeln beskriver hur man flyttar strömmande tabeller och materialiserade vyer mellan pipelines. Efter flytten uppdaterar den pipeline som du flyttar flödet till tabellen i stället för den ursprungliga pipelinen. Detta är användbart i många scenarier, bland annat:

  • Dela upp en stor pipeline i mindre.
  • Sammanfoga flera pipelines till en enda större pipeline.
  • Ändra uppdateringsfrekvensen för vissa tabeller i en pipeline.
  • Flytta tabeller från en pipeline som använder det äldre publiceringsläget till standardpubliceringsläget. Mer information om det äldre publiceringsläget finns i Äldre publiceringsläge för pipelines. Information om hur du kan migrera publiceringsläget för en hel pipeline på en gång finns i Aktivera standardpubliceringsläget i en pipeline.
  • Flytta tabeller mellan dataflöden i olika arbetsytor.

Kravspecifikation

De följande är kraven för att flytta en tabell mellan processflöden.

  • Du måste använda Databricks Runtime 16.3 eller senare när du kör ALTER ... kommandot och Databricks Runtime 17.2 för tabellflyttning mellan arbetsytor.

  • Både käll- och målpipelines måste vara:

    • Ägs av Azure Databricks-användarkontot eller tjänstens huvudentitet som kör operationen
    • I arbetsytor som delar ett metaarkiv. Information om hur du kontrollerar metaarkivet finns i current_metastore funktionen.
  • Målpipelinen måste använda standardläget för publicering. På så sätt kan du publicera tabeller till flera kataloger och scheman.

    Alternativt måste båda pipelines använda det äldre publiceringsläget och båda måste ha samma katalog- och målvärde i inställningarna. Information om det äldre publiceringsläget finns i LIVE-schema (äldre).

Anmärkning

Den här funktionen har inte stöd för att flytta en pipeline med standardpubliceringsläget till en pipeline med det äldre publiceringsläget.

Flytta en tabell mellan pipelines

Följande instruktioner beskriver hur du flyttar en strömmande tabell eller materialiserad vy från en pipeline till en annan.

  1. Stoppa källpipelinen om den körs. Vänta tills den har stannat helt.

  2. Ta bort tabellens definition från källpipelinens kod och lagra den någonstans för framtida referens.

    Inkludera eventuella stödjande frågor eller kod som behövs för att pipelinen ska kunna köras korrekt.

  3. Från en notebook-fil eller en SQL-redigerare kör du följande SQL-kommando för att omtilldela tabellen från källpipelinen till målpipelinen:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    Observera att SQL-kommandot måste köras från källpipelinens arbetsyta.

    Kommandot använder ALTER MATERIALIZED VIEW och ALTER STREAMING TABLE för hanterade materialiserade vyer och strömmande tabeller i Unity Catalog. Om du vill utföra samma åtgärd i en Hive-metaarkivtabell använder du ALTER TABLE.

    Om du till exempel vill flytta en strömmande tabell med namnet sales till en pipeline med ID abcd1234-ef56-ab78-cd90-1234efab5678:t kör du följande kommando:

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    Anmärkning

    pipelineId Måste vara en giltig pipelineidentifierare. Värdet null är inte tillåtet.

  4. Lägg till tabellens definition i målpipelinens kod.

    Anmärkning

    Om katalogen eller målschemat skiljer sig åt mellan källan och målet kanske det inte fungerar att kopiera frågan exakt. Delvis kvalificerade tabeller i definitionen kan tolkas på olika sätt. Du kan behöva uppdatera definitionen när du flyttar för att fullständigt kvalificera tabellnamnen.

    Anmärkning

    Ta bort eller kommentera ut eventuella append once-funktioner i målpipeledningens kod (i Python, frågor, med append_flow(once=True), i SQL, frågor, med INSERT INTO ONCE). Mer information finns i Begränsningar.

Flytten är klar. Nu kan du köra både käll- och målpipelines. Målpipelinen uppdaterar tabellen.

Felsökning

I följande tabell beskrivs fel som kan inträffa när du flyttar en tabell mellan pipelines.

Error Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE Källpipelinen är i standard publiceringsläge, och destinationsplatsen använder LIVE-schemaläge (legacy). Detta stöds inte. Om källan använder standardpubliceringsläget måste även målet göra det.
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE Endast att flytta tabeller mellan pipelines är möjligt. Flytt av strömmande tabeller och materialiserade vyer som skapats med Databricks SQL stöds inte.
DESTINATION_PIPELINE_NOT_FOUND pipelines.pipelineId måste vara en giltig pipeline. Får pipelineId inte vara null.
Tabellen misslyckas med att uppdatera i destinationen efter flytten. För att snabbt åtgärda flyttar du tillbaka tabellen till källpipelinen enligt samma instruktioner.
PIPELINE_PERMISSION_DENIED_NOT_OWNER Både käll- och målpipelines måste ägas av användaren som utför flyttåtgärden.
TABLE_ALREADY_EXISTS Tabellen som visas i felmeddelandet finns redan. Detta kan inträffa om det redan finns en stödtabell för pipelinen. I det här fallet DROP anges tabellen i felet.

Exempel med flera tabeller i en pipeline

Pipelines kan innehålla mer än en tabell. Du kan fortfarande flytta en tabell i taget mellan pipelines. I det här scenariot finns det tre tabeller (table_a, table_b, table_c) som läser från varandra sekventiellt i källpipelinen. Vi vill flytta en tabell, table_b, till en annan pipeline.

Inledande källkodspipelinekod:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Vi flyttar table_b till en annan pipeline genom att kopiera och ta bort tabelldefinitionen från källan och uppdatera table_bpipelineId.

Pausa först alla scheman och vänta tills uppdateringarna har slutförts på både käll- och målpipelines. Ändra sedan källpipelinen för att ta bort koden för tabellen som flyttas. Den uppdaterade exempelkoden för källpipelinen blir:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Gå till SQL-redigeraren för att köra ALTER pipelineId kommandot.

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Gå sedan till målpipelinen och lägg till definitionen av table_b. Om standardkatalogen och schemat är samma i pipelineinställningarna krävs inga kodändringar.

Målpipelinekoden:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

Om standardkatalogen och schemat skiljer sig åt i pipelineinställningarna måste du ange det fullt kvalificerade namnet utifrån pipelinens katalog och schema.

Till exempel kan målpipelinekoden vara:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

Kör (eller återaktivera scheman) för både käll- och målpipelines.

Pipelines är nu uppdelade. Frågan för table_c läser från table_b (nu i mål-pipeline) och table_b läser från table_a (i käll-pipeline). När du utför en triggad körning på källpipelinen uppdateras inte table_b eftersom den inte längre hanteras av källpipelinen. Källpipelinen behandlar table_b som en tabell utanför pipelinen. Detta kan jämföras med att definiera en materialiserad vyläsning från en Delta-tabell i Unity Catalog som inte hanteras av pipelinen.

Begränsningar

Följande är begränsningar vid att flytta tabeller mellan pipelines.

  • Materialiserade vyer och strömmande tabeller som skapats med Databricks SQL stöds inte.
  • Flöden som endast läggs till en gång – Python append_flow(once=True)-flöden och SQL-flöden med INSERT INTO ONCE – stöds inte. Deras körningsstatus bevaras inte, och de kan köras igen i destinationspipen. Ta bort eller kommentera ut "append once flows" från destinationspipelinan för att undvika att köra dessa flöden igen.
  • Privata tabeller eller vyer stöds inte.
  • Käll- och målpipelines måste vara pipelines. Null-pipelines stöds inte.
  • Käll- och målpipelines måste antingen finnas på samma arbetsyta eller i olika arbetsytor som delar samma metaarkiv.
  • Både käll- och målpipelines måste ägas av användaren som kör flyttoperationen.
  • Om källpipelinen använder standardpubliceringsläget måste målpipelinen också använda standardpubliceringsläget. Du kan inte flytta en tabell från en pipeline med standardpubliceringsläget till en pipeline som använder LIVE-schemat (äldre). Se LIVE-schemat (äldre).
  • Om både käll- och målpipelines använder LIVE-schemat (äldre) måste de ha samma catalog värden och target värden i inställningarna.