Sdílet prostřednictvím


Přesun tabulek mezi potrubími

Tento článek popisuje, jak přesouvat streamované tabulky a materializovaná zobrazení mezi kanály. Po přesunu je tok přesunut do jiného kanálu, který aktualizuje tabulku místo původního kanálu. To je užitečné v mnoha scénářích, mezi které patří:

  • Rozdělte velký kanál na menší kanály.
  • Sloučit více potrubí do jednoho většího potrubí.
  • Změňte frekvenci aktualizace některých tabulek v datovém kanálu.
  • Přesuňte tabulky z kanálu, který používá starší režim publikování, do výchozího režimu publikování. Podrobnosti o starším režimu publikování najdete v tématu Režim starší verze publikování pro kanály. Pokud chcete zjistit, jak můžete migrovat režim publikování pro celý kanál najednou, přečtěte si téma Povolení výchozího režimu publikování v kanálu.
  • Přesun tabulek mezi datovými toky v různých prostředích.

Požadavky

Níže jsou uvedené požadavky na přesun tabulky mezi kanály.

  • Při spuštění ALTER ... příkazu musíte použít Databricks Runtime 16.3 nebo vyšší a Databricks Runtime 17.2 pro přesun mezi tabulkami pracovních prostorů.

  • Zdrojové i cílové potrubí musí být v pracovních prostorech, které sdílejí metastore. Pokud chcete zkontrolovat metastore, podívejte se na current_metastore funkci.

  • Uživatelský účet nebo služba s přístupovými právy, na kterých je operace spuštěná, musí být spuštěna jako uživatel jak zdrojového, tak i cílového potrubí.

  • Cílový kanál musí používat výchozí režim publikování. To umožňuje publikovat tabulky do více katalogů a schémat.

    Alternativně musí oba kanály používat starší režim publikování a oba musí mít stejný katalog a cílovou hodnotu v nastavení. Informace o starším režimu publikování najdete v tématu ŽIVÉ schéma (starší verze).

Poznámka:

Tato funkce nepodporuje přesun kanálu pomocí výchozího režimu publikování do kanálu pomocí starší verze režimu publikování.

Přesun tabulky mezi kanály

Následující pokyny popisují, jak přesunout streamovanou tabulku nebo materializované zobrazení z jednoho kanálu do druhého.

  1. Pokud je spuštěný, zastavte zdrojový kanál. Počkejte, až se úplně zastaví.

  2. Odeberte definici tabulky z kódu zdrojového kanálu a uložte ji někam pro budoucí referenci.

    Zahrňte všechny podpůrné dotazy nebo kód, které jsou potřeba ke správnému spuštění kanálu.

  3. Z poznámkového bloku nebo editoru SQL spusťte následující příkaz SQL a znovu přiřaďte tabulku ze zdrojového kanálu k cílovému kanálu:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    Všimněte si, že příkaz SQL musí být spuštěný z pracovního prostoru zdrojového kanálu.

    Příkaz používá ALTER MATERIALIZED VIEW a ALTER STREAMING TABLE pro spravovaná materializovaná zobrazení katalogu Unity a tabulky streamování v uvedeném pořadí. Chcete-li provést stejnou akci u tabulky metastoru Hive, použijte ALTER TABLE.

    Pokud například chcete přesunout streamovací tabulku pojmenovanou sales do kanálu s ID abcd1234-ef56-ab78-cd90-1234efab5678, spusťte následující příkaz:

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    Poznámka:

    Musí pipelineId to být platný identifikátor kanálu. Hodnota null není povolená.

  4. Přidejte definici tabulky do kódu cílového kanálu.

    Poznámka:

    Pokud se katalog nebo cílové schéma mezi zdrojem a cílem liší, nemusí kopírování dotazu přesně fungovat. Částečně kvalifikované tabulky v definici mohou být vyřešeny různě. Je možné, že budete muset aktualizovat definici při přechodu k plnému kvalifikování názvů tabulek.

    Poznámka:

    Odeberte nebo zakomentujte všechny přidávací toky (v Pythonu, dotazy s append_flow(once=True), v SQL, dotazy s INSERT INTO ONCE) z kódu cílového kanálu. Další podrobnosti najdete v tématu Omezení.

Přesun je dokončen. Teď můžete spouštět zdrojové i cílové kanály. Cílový datový tok aktualizuje tabulku.

Řešení problémů

Následující tabulka popisuje chyby, ke kterým může dojít při přesouvání tabulky mezi kanály.

Error Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE Zdrojový kanál je ve výchozím režimu publikování a cíl používá režim ŽIVÉho schématu (starší verze). Tato funkce není podporována. Pokud zdroj používá výchozí režim publikování, musí ho používat i cíl.
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE Podporuje se pouze přesouvání tabulek mezi kanály. Přesun streamovaných tabulek a materializovaných zobrazení vytvořených pomocí Databricks SQL se nepodporuje.
DESTINATION_PIPELINE_NOT_FOUND pipelines.pipelineId musí být platným kanálem. Hodnota pipelineId nemůže být null.
Tabulka se po přesunutí neaktualizuje v cíli. V tomto případě rychle zmírněte situaci tím, že tabulku přesunete zpět do zdrojového kanálu podle stejných pokynů.
PIPELINE_PERMISSION_DENIED_NOT_OWNER Zdrojový i cílový kanál musí vlastnit uživatel provádějící operaci přesunutí.
TABLE_ALREADY_EXISTS Tabulka uvedená v chybové zprávě již existuje. K tomu může dojít v případě, že již existuje podkladová tabulka pro kanál. V tomto případě je DROP tabulka uvedená v chybě.

Příklad s více tabulkami v datovém toku

Potrubí mohou obsahovat více než jednu tabulku. Mezi kanály můžete stále přesouvat jednu tabulku najednou. V tomto scénáři existují tři tabulky (table_a, table_b, table_c), které se vzájemně čtou postupně ve zdrojovém kanálu. Chceme přesunout jednu tabulku table_b do jiného potrubí.

Počáteční zdrojový kód kanálu:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Přesuneme table_b do jiného kanálu zkopírováním definice tabulky ze zdroje a jejím odebráním a aktualizací table_b ID kanálu.

Nejprve pozastavte všechny plány a počkejte na dokončení aktualizací ve zdrojovém i cílovém kanálu. Potom upravte zdrojový kanál tak, aby odebral kód pro přesunutou tabulku. Aktualizovaný ukázkový kód zdrojového kanálu je uveden:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Přejděte do editoru SQL a spusťte příkaz ALTER pipelineId.

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Dále přejděte do cílového kanálu a přidejte definici table_b. Pokud je výchozí katalog a schéma stejné v nastavení kanálu, nejsou potřeba žádné změny kódu.

Kód cílového kanálu:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

Pokud se výchozí katalog a schéma liší v nastavení kanálu, musíte přidat plně kvalifikovaný název pomocí katalogu a schématu kanálu.

Například kód cílového kanálu může být:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

Spusťte (nebo znovu povolte plány) pro zdrojové i cílové kanály.

Potrubí jsou teď oddělená. Dotaz na table_c čte z table_b (nyní v cílovém kanálu) a table_b čte z table_a (ve zdrojovém kanálu). Při spuštění aktivovaného ve zdrojovém kanálu table_b se neaktualizuje, protože už ho nespravuje zdrojový kanál. Zdrojový kanál zachází s table_b jako s tabulkou, která je pro kanál externí. To je srovnatelné s definováním materializovaného zobrazení čtení z tabulky Delta v katalogu Unity, která není spravována kanálem.

omezení

Níže jsou uvedená omezení pro přesouvání tabulek mezi kanály.

  • Materializovaná zobrazení a streamované tabulky vytvořené pomocí Databricks SQL se nepodporují.
  • Toky pro přidávání jen jednou – Python append_flow(once=True) a SQL toky INSERT INTO ONCE – nejsou podporovány. Jejich stav spuštění se nezachová, můžou se v cílovém kanálu spustit znovu. Odstraňte nebo zakomentujte jednorázové přírůstkové toky v cílovém potrubí, abyste zabránili jejich opětovnému spuštění.
  • Soukromé tabulky nebo zobrazení nejsou podporovány.
  • Zdrojové a cílové procesní linky musí být potrubí. Nulové kanály nejsou podporovány.
  • Zdrojové a cílové kanály musí být buď ve stejném pracovním prostoru, nebo v různých pracovních prostorech, které sdílejí stejný metastor.
  • Zdrojový i cílový kanál musí vlastnit uživatel, který spouští operaci přesunutí.
  • Pokud zdrojový kanál používá výchozí režim publikování, cílový kanál musí také používat výchozí režim publikování. Tabulku nemůžete přesunout z kanálu pomocí výchozího režimu publikování do kanálu, který používá schéma LIVE (starší verze). Viz LIVE schema (starší verze).
  • Pokud zdrojové i cílové kanály používají schéma LIVE (starší verze), musí mít v nastavení stejné catalog hodnoty a target hodnoty.