Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento článek popisuje, jak přesouvat streamované tabulky a materializovaná zobrazení mezi kanály. Po přesunu je tok přesunut do jiného kanálu, který aktualizuje tabulku místo původního kanálu. To je užitečné v mnoha scénářích, mezi které patří:
- Rozdělte velký kanál na menší kanály.
- Sloučit více potrubí do jednoho většího potrubí.
- Změňte frekvenci aktualizace některých tabulek v datovém kanálu.
- Přesuňte tabulky z kanálu, který používá starší režim publikování, do výchozího režimu publikování. Podrobnosti o starším režimu publikování najdete v tématu Režim starší verze publikování pro kanály. Pokud chcete zjistit, jak můžete migrovat režim publikování pro celý kanál najednou, přečtěte si téma Povolení výchozího režimu publikování v kanálu.
- Přesun tabulek mezi datovými toky v různých prostředích.
Požadavky
Níže jsou uvedené požadavky na přesun tabulky mezi kanály.
Při spuštění
ALTER ...příkazu musíte použít Databricks Runtime 16.3 nebo vyšší a Databricks Runtime 17.2 pro přesun mezi tabulkami pracovních prostorů.Zdrojové i cílové potrubí musí být v pracovních prostorech, které sdílejí metastore. Pokud chcete zkontrolovat metastore, podívejte se na
current_metastorefunkci.Uživatelský účet nebo služba s přístupovými právy, na kterých je operace spuštěná, musí být spuštěna jako uživatel jak zdrojového, tak i cílového potrubí.
Cílový kanál musí používat výchozí režim publikování. To umožňuje publikovat tabulky do více katalogů a schémat.
Alternativně musí oba kanály používat starší režim publikování a oba musí mít stejný katalog a cílovou hodnotu v nastavení. Informace o starším režimu publikování najdete v tématu ŽIVÉ schéma (starší verze).
Poznámka:
Tato funkce nepodporuje přesun kanálu pomocí výchozího režimu publikování do kanálu pomocí starší verze režimu publikování.
Přesun tabulky mezi kanály
Následující pokyny popisují, jak přesunout streamovanou tabulku nebo materializované zobrazení z jednoho kanálu do druhého.
Pokud je spuštěný, zastavte zdrojový kanál. Počkejte, až se úplně zastaví.
Odeberte definici tabulky z kódu zdrojového kanálu a uložte ji někam pro budoucí referenci.
Zahrňte všechny podpůrné dotazy nebo kód, které jsou potřeba ke správnému spuštění kanálu.
Z poznámkového bloku nebo editoru SQL spusťte následující příkaz SQL a znovu přiřaďte tabulku ze zdrojového kanálu k cílovému kanálu:
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");Všimněte si, že příkaz SQL musí být spuštěný z pracovního prostoru zdrojového kanálu.
Příkaz používá
ALTER MATERIALIZED VIEWaALTER STREAMING TABLEpro spravovaná materializovaná zobrazení katalogu Unity a tabulky streamování v uvedeném pořadí. Chcete-li provést stejnou akci u tabulky metastoru Hive, použijteALTER TABLE.Pokud například chcete přesunout streamovací tabulku pojmenovanou
salesdo kanálu s IDabcd1234-ef56-ab78-cd90-1234efab5678, spusťte následující příkaz:ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");Poznámka:
Musí
pipelineIdto být platný identifikátor kanálu. Hodnotanullnení povolená.Přidejte definici tabulky do kódu cílového kanálu.
Poznámka:
Pokud se katalog nebo cílové schéma mezi zdrojem a cílem liší, nemusí kopírování dotazu přesně fungovat. Částečně kvalifikované tabulky v definici mohou být vyřešeny různě. Je možné, že budete muset aktualizovat definici při přechodu k plnému kvalifikování názvů tabulek.
Poznámka:
Odeberte nebo zakomentujte všechny přidávací toky (v Pythonu, dotazy s append_flow(once=True), v SQL, dotazy s INSERT INTO ONCE) z kódu cílového kanálu. Další podrobnosti najdete v tématu Omezení.
Přesun je dokončen. Teď můžete spouštět zdrojové i cílové kanály. Cílový datový tok aktualizuje tabulku.
Řešení problémů
Následující tabulka popisuje chyby, ke kterým může dojít při přesouvání tabulky mezi kanály.
| Error | Description |
|---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
Zdrojový kanál je ve výchozím režimu publikování a cíl používá režim ŽIVÉho schématu (starší verze). Tato funkce není podporována. Pokud zdroj používá výchozí režim publikování, musí ho používat i cíl. |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
Podporuje se pouze přesouvání tabulek mezi kanály. Přesun streamovaných tabulek a materializovaných zobrazení vytvořených pomocí Databricks SQL se nepodporuje. |
DESTINATION_PIPELINE_NOT_FOUND |
pipelines.pipelineId musí být platným kanálem. Hodnota pipelineId nemůže být null. |
| Tabulka se po přesunutí neaktualizuje v cíli. | V tomto případě rychle zmírněte situaci tím, že tabulku přesunete zpět do zdrojového kanálu podle stejných pokynů. |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
Zdrojový i cílový kanál musí vlastnit uživatel provádějící operaci přesunutí. |
TABLE_ALREADY_EXISTS |
Tabulka uvedená v chybové zprávě již existuje. K tomu může dojít v případě, že již existuje podkladová tabulka pro kanál. V tomto případě je DROP tabulka uvedená v chybě. |
Příklad s více tabulkami v datovém toku
Potrubí mohou obsahovat více než jednu tabulku. Mezi kanály můžete stále přesouvat jednu tabulku najednou. V tomto scénáři existují tři tabulky (table_a, table_b, table_c), které se vzájemně čtou postupně ve zdrojovém kanálu. Chceme přesunout jednu tabulku table_b do jiného potrubí.
Počáteční zdrojový kód kanálu:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Přesuneme table_b do jiného kanálu zkopírováním definice tabulky ze zdroje a jejím odebráním a aktualizací table_b ID kanálu.
Nejprve pozastavte všechny plány a počkejte na dokončení aktualizací ve zdrojovém i cílovém kanálu. Potom upravte zdrojový kanál tak, aby odebral kód pro přesunutou tabulku. Aktualizovaný ukázkový kód zdrojového kanálu je uveden:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Přejděte do editoru SQL a spusťte příkaz ALTER pipelineId.
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
Dále přejděte do cílového kanálu a přidejte definici table_b. Pokud je výchozí katalog a schéma stejné v nastavení kanálu, nejsou potřeba žádné změny kódu.
Kód cílového kanálu:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
Pokud se výchozí katalog a schéma liší v nastavení kanálu, musíte přidat plně kvalifikovaný název pomocí katalogu a schématu kanálu.
Například kód cílového kanálu může být:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
Spusťte (nebo znovu povolte plány) pro zdrojové i cílové kanály.
Potrubí jsou teď oddělená. Dotaz na table_c čte z table_b (nyní v cílovém kanálu) a table_b čte z table_a (ve zdrojovém kanálu). Při spuštění aktivovaného ve zdrojovém kanálu table_b se neaktualizuje, protože už ho nespravuje zdrojový kanál. Zdrojový kanál zachází s table_b jako s tabulkou, která je pro kanál externí. To je srovnatelné s definováním materializovaného zobrazení čtení z tabulky Delta v katalogu Unity, která není spravována kanálem.
omezení
Níže jsou uvedená omezení pro přesouvání tabulek mezi kanály.
- Materializovaná zobrazení a streamované tabulky vytvořené pomocí Databricks SQL se nepodporují.
- Toky pro přidávání jen jednou – Python append_flow(once=True) a SQL toky INSERT INTO ONCE – nejsou podporovány. Jejich stav spuštění se nezachová, můžou se v cílovém kanálu spustit znovu. Odstraňte nebo zakomentujte jednorázové přírůstkové toky v cílovém potrubí, abyste zabránili jejich opětovnému spuštění.
- Soukromé tabulky nebo zobrazení nejsou podporovány.
- Zdrojové a cílové procesní linky musí být potrubí. Nulové kanály nejsou podporovány.
- Zdrojové a cílové kanály musí být buď ve stejném pracovním prostoru, nebo v různých pracovních prostorech, které sdílejí stejný metastor.
- Zdrojový i cílový kanál musí vlastnit uživatel, který spouští operaci přesunutí.
- Pokud zdrojový kanál používá výchozí režim publikování, cílový kanál musí také používat výchozí režim publikování. Tabulku nemůžete přesunout z kanálu pomocí výchozího režimu publikování do kanálu, který používá schéma LIVE (starší verze). Viz LIVE schema (starší verze).
- Pokud zdrojové i cílové kanály používají schéma LIVE (starší verze), musí mít v nastavení stejné
cataloghodnoty atargethodnoty.