Condividi tramite


Spostare tabelle tra pipeline

Questo articolo descrive come spostare tabelle di streaming e viste materializzate tra pipeline. Dopo lo spostamento, la pipeline a cui hai spostato il flusso aggiorna la tabella anziché la pipeline originale. Ciò è utile in molti scenari, tra cui:

  • Suddividere una pipeline di grandi dimensioni in quelle più piccole.
  • Unire più pipeline in un'unica pipeline più grande.
  • Modificare la frequenza di aggiornamento per alcune tabelle in una pipeline.
  • Spostare tabelle da una pipeline che usa la modalità di pubblicazione legacy alla modalità di pubblicazione predefinita. Per informazioni dettagliate sulla modalità di pubblicazione legacy, vedere Modalità di pubblicazione legacy per le pipeline. Per informazioni su come eseguire la migrazione della modalità di pubblicazione per un'intera pipeline contemporaneamente, vedere Abilitare la modalità di pubblicazione predefinita in una pipeline.
  • Spostare tabelle tra pipeline in aree di lavoro diverse.

Requisiti

Di seguito sono riportati i requisiti per lo spostamento di una tabella tra le pipeline.

  • È necessario usare Databricks Runtime 16.3 o versione successiva quando si esegue il ALTER ... comando e Databricks Runtime 17.2 per lo spostamento tra aree di lavoro.

  • Sia le pipeline di origine che di destinazione devono essere:

    • Di proprietà dell'account utente Azure Databricks o del principale del servizio che esegue l'operazione.
    • Nelle aree di lavoro che condividono un metastore. Per controllare il metastore, vedere current_metastore funzione.
  • La pipeline di destinazione deve usare la modalità di pubblicazione predefinita. In questo modo è possibile pubblicare tabelle in più cataloghi e schemi.

    In alternativa, entrambe le pipeline devono usare la modalità di pubblicazione legacy ed entrambi devono avere lo stesso catalogo e lo stesso valore di destinazione nelle impostazioni. Per informazioni sulla modalità di pubblicazione legacy, vedere Schema LIVE (legacy).

Annotazioni

Questa funzionalità non supporta lo spostamento di una pipeline usando la modalità di pubblicazione predefinita in una pipeline usando la modalità di pubblicazione legacy.

Spostare una tabella tra le pipeline

Le istruzioni seguenti descrivono come spostare una tabella di streaming o una vista materializzata da una pipeline a un'altra.

  1. Fermare la pipeline di origine se è in esecuzione. Aspetta che si arresti completamente.

  2. Rimuovere la definizione della tabella dal codice della pipeline di origine e archiviarla in un punto qualsiasi per riferimento futuro.

    Includere tutte le query di supporto o il codice necessario per l'esecuzione corretta della pipeline.

  3. Da un notebook o un editor SQL eseguire il comando SQL seguente per riassegnare la tabella dalla pipeline di origine alla pipeline di destinazione:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    Si noti che il comando SQL deve essere eseguito dall'area di lavoro della pipeline di origine.

    Il comando usa ALTER MATERIALIZED VIEW e ALTER STREAMING TABLE rispettivamente per le viste materializzate e le tabelle di streaming gestite dal catalogo Unity. Per eseguire la stessa azione in una tabella metastore Hive, usare ALTER TABLE.

    Ad esempio, se si vuole spostare una tabella di streaming denominata sales in una pipeline con l'ID abcd1234-ef56-ab78-cd90-1234efab5678, eseguire il comando seguente:

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    Annotazioni

    Il pipelineId deve essere un identificatore di pipeline valido. Il null valore non è consentito.

  4. Aggiungere la definizione della tabella al codice della pipeline di destinazione.

    Annotazioni

    Se lo schema di catalogo o di destinazione è diverso tra l'origine e la destinazione, la copia della query potrebbe non funzionare esattamente. Le tabelle parzialmente qualificate nella definizione possono essere risolte in modo diverso. Potrebbe essere necessario aggiornare la definizione durante lo spostamento per qualificare completamente i nomi delle tabelle.

    Annotazioni

    Rimuovere o commentare qualsiasi flusso di accodamento una tantum (in Python, query con append_flow(once=True), in SQL, query con INSERT INTO ONCE) dal codice della pipeline di destinazione. Per altri dettagli, vedere Limitazioni.

Lo spostamento è completo. È ora possibile eseguire sia le pipeline di origine che di destinazione. La pipeline di destinazione aggiorna la tabella.

Risoluzione dei problemi

Nella tabella seguente vengono descritti gli errori che possono verificarsi durante lo spostamento di una tabella tra le pipeline.

Errore Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE La pipeline di origine è in modalità di pubblicazione predefinita e la destinazione usa la modalità dello schema LIVE (legacy). Questo non è supportato. Se l'origine usa la modalità di pubblicazione predefinita, anche la destinazione deve.
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE Lo spostamento di tabelle tra le pipeline è l'unica operazione supportata. Lo spostamento di tabelle di streaming e viste materializzate create con Databricks SQL non è supportato.
DESTINATION_PIPELINE_NOT_FOUND La pipelines.pipelineId deve essere una pipeline valida. Il pipelineId non può essere nullo.
La tabella non viene aggiornata nella destinazione dopo lo spostamento. Per attenuare rapidamente questa situazione, si consiglia di spostare nuovamente la tabella nella pipeline di origine seguendo le stesse istruzioni.
PIPELINE_PERMISSION_DENIED_NOT_OWNER Entrambe le pipeline di origine e di destinazione devono essere di proprietà dell'utente che esegue l'operazione di spostamento.
TABLE_ALREADY_EXISTS La tabella elencata nel messaggio di errore esiste già. Ciò può verificarsi se esiste già una tabella di backup per la pipeline. In questo caso, DROP la tabella elencata nell'errore.

Esempio con più tabelle in una pipeline

Le pipeline possono contenere più tabelle. È comunque possibile spostare una tabella alla volta tra le pipeline. In questo scenario sono presenti tre tabelle (table_a, table_b, table_c) che leggono l'una dall'altra in sequenza nella pipeline di origine. Si vuole spostare una tabella, table_b, in un'altra pipeline.

Codice della pipeline di sorgente iniziale:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Trasferiamo table_b a un'altra pipeline copiando e rimuovendo la definizione di tabella dall'origine e aggiornando table_b il valore pipelineId.

Prima di tutto, sospendere le pianificazioni e attendere il completamento degli aggiornamenti nelle pipeline di origine e di destinazione. Modificare quindi la pipeline di origine per rimuovere il codice per la tabella da spostare. Il codice di esempio della pipeline di origine aggiornato diventa:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Passare all'editor SQL per eseguire il ALTER pipelineId comando .

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Passare quindi alla pipeline di destinazione e aggiungere la definizione di table_b. Se il catalogo e lo schema predefiniti sono gli stessi nelle impostazioni della pipeline, non sono necessarie modifiche al codice.

Il codice della pipeline di destinazione:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

Se il catalogo e lo schema predefiniti differiscono nelle impostazioni della pipeline, è necessario aggiungere il nome completo usando il catalogo e lo schema della pipeline.

Ad esempio, il codice della pipeline di destinazione potrebbe essere:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

Eseguire (o riabilitare le pianificazioni) per le pipeline di origine e di destinazione.

Le condutture sono ora disgiunte. La query per table_c legge da table_b (ora nella pipeline di destinazione) e table_b legge da table_a (nella pipeline di origine). Quando si esegue un'esecuzione automatica, table_b non viene aggiornato perché non è più gestito dalla pipeline di origine. La pipeline di origine considera table_b come una tabella esterna alla pipeline. Ciò è paragonabile alla definizione di una visualizzazione materializzata che legge da una tabella Delta nel Catalogo Unity, non gestita dalla pipeline.

Limitazioni

Di seguito sono riportate alcune limitazioni per lo spostamento di tabelle tra le pipeline.

  • Le viste materializzate e le tabelle di streaming create con Databricks SQL non sono supportate.
  • I flussi di accodamento una sola volta , ovvero i flussi python append_flow(once=True) e i flussi SQL INSERT INTO ONCE , non sono supportati. I loro stati di esecuzione non vengono mantenuti, e potrebbero essere eseguiti nuovamente nella pipeline di destinazione. Rimuovere o commentare i flussi 'append once' dalla pipeline di destinazione per evitare di eseguire di nuovo questi flussi.
  • Le tabelle o le viste private non sono supportate.
  • Le pipeline di origine e destinazione devono essere effettivamente pipeline. Le pipeline Null non sono supportate.
  • Le pipeline di origine e di destinazione devono trovarsi nella stessa area di lavoro o in aree di lavoro diverse che condividono lo stesso metastore.
  • Sia le pipeline di origine che di destinazione devono essere di proprietà dell'utente che esegue l'operazione di spostamento.
  • Se la pipeline di origine usa la modalità di pubblicazione predefinita, la pipeline di destinazione deve usare anche la modalità di pubblicazione predefinita. Non è possibile spostare una tabella da una pipeline usando la modalità di pubblicazione predefinita in una pipeline che usa lo schema LIVE (legacy). Visualizza lo schema IN DIRETTA (legacy) .
  • Se le pipeline di origine e di destinazione usano entrambi lo schema LIVE (legacy), devono avere gli stessi catalog valori e target nelle impostazioni.