Sdílet prostřednictvím


Postupné načítání a zpracování dat pomocí deklarativních pipeline toků Lakeflow Spark

Data se zpracovávají v kanálech prostřednictvím toků. Každý tok se skládá z dotazu a obvykle cíle. Tok dat zpracovává datový dotaz buď jako dávku, nebo přírůstkově jako proud dat do cílového systému. Tok se nachází v pipeline v Lakeflow Spark deklarativních pipelinech.

Toky se obvykle definují automaticky při vytváření dotazu v kanálu, který aktualizuje cíl, ale můžete také explicitně definovat další toky pro složitější zpracování, například připojení k jednomu cíli z více zdrojů.

Aktualizace

Pokaždé, když se aktualizuje jeho definující potrubí, se tok spustí. Tok vytvoří nebo aktualizuje tabulky s nejnovějšími dostupnými daty. V závislosti na typu toku a stavu změn dat může aktualizace provést přírůstkovou aktualizaci, která zpracovává pouze nové záznamy, nebo provést úplnou aktualizaci, která znovu zpracuje všechny záznamy ze zdroje dat.

Vytvoření výchozího toku

Při vytváření kanálu obvykle definujete tabulku nebo zobrazení spolu s dotazem, který ho podporuje. Například v tomto dotazu SQL vytvoříte streamovací tabulku volanou customers_silver čtením z tabulky s názvem customers_bronze.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Stejnou streamovací tabulku můžete vytvořit i v Pythonu. V Pythonu použijete pipelines vytvořením funkce dotazu, která vrací datový rámec, s dekorátory pro přidání funkcionality Lakeflow Spark Declarative Pipelines:

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

V tomto příkladu jste vytvořili streamovací tabulku. Můžete také vytvořit materializovaná zobrazení s podobnou syntaxí v SQL i Pythonu. Další informace naleznete v tématu Streamované tabulky a Materializovaná zobrazení.

Tento příklad vytvoří výchozí tok spolu s tabulkou streamování. Výchozí tok pro streamovací tabulku je přidávací tok, který přidává nové řádky s každým triggerem. To je nejběžnější způsob použití kanálů: vytvoření toku a cíle v jednom kroku. Tento styl můžete použít k ingestování dat nebo k transformaci dat.

Doplňovací toky také podporují zpracování, které vyžaduje čtení dat z více streamovaných zdrojů k aktualizaci jednoho cíle. Například funkčnost připojení toku můžete využít, když máte existující streamovací tabulku a chcete přidat nový streamovací zdroj, který zapisuje do této existující streamovací tabulky.

Použití více toků k zápisu do jednoho cíle

V předchozím příkladu jste vytvořili tok a streamovací tabulku v jednom kroku. Toky můžete vytvořit i pro dříve vytvořenou tabulku. V tomto příkladu můžete vidět vytvoření tabulky a toku přidruženého k ní v samostatných krocích. Tento kód má stejné výsledky jako vytvoření výchozího toku, včetně použití stejného názvu pro streamovací tabulku a tok.

Python

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Vytvoření toku nezávisle na cíli znamená, že můžete také vytvořit více toků, které připojují data ke stejnému cíli.

Pomocí dekorátoru v rozhraní Pythonu nebo klauzule @dp.append_flow v rozhraní SQL můžete vytvořit nový tok, například zaměřit se na streamovací tabulku z více zdrojů streamování. Použijte průběžný tok pro zpracování úkolů, jako je například následující:

  • Přidejte streamované zdroje, které připojují data k existující streamované tabulce bez nutnosti úplné aktualizace. Můžete mít například tabulku, která kombinuje regionální data z každé oblasti, ve které pracujete. Při nasazení nových oblastí můžete do tabulky přidat nová data oblasti, aniž byste provedli úplnou aktualizaci. Příklad přidání zdrojů streamování do existující streamovací tabulky najdete v tématu Příklad: Zapisování do streamovací tabulky z několika témat Kafka.
  • Aktualizujte streamovací tabulku přidáním chybějících historických dat (obnovení). Můžete použít syntaxi INSERT INTO ONCE k vytvoření historického doplnění, které se spustí pouze jednou. Máte například existující streamovací tabulku napsanou tématem Apache Kafka. Máte také historická data uložená v tabulce, kterou potřebujete vložit přesně jednou do streamované tabulky, a nemůžete je streamovat, protože zpracování zahrnuje provedení komplexní agregace před vložením dat. Příklad zpětného doplnění dat najdete v tématu Obnovení historických dat pomocí datových toků.
  • Kombinovat data z více zdrojů a zapisovat do jedné streamovací tabulky místo použití UNION klauzule v dotazu. Pomocí zpracování přidávacího toku místo UNION můžete aktualizovat cílovou tabulku přírůstkově bez spuštění úplné aktualizace. Příklad sjednocení, které je tímto způsobem provedeno, naleznete v části Příklad: Použití zpracování toku připojení místo UNION.

Cílem výstupu záznamů při zpracování toku připojování může být existující tabulka nebo nová tabulka. V případě dotazů v Pythonu vytvořte cílovou tabulku pomocí funkce create_streaming_table( ).

Následující příklad přidá dva toky pro stejný cíl a vytvoří sjednocení dvou zdrojových tabulek:

Python

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Důležité

  • Pokud potřebujete definovat omezení kvality dat s očekáváními, definujte očekávání v cílové tabulce jako součást create_streaming_table() funkce nebo v existující definici tabulky. V definici @append_flow nelze definovat očekávání.
  • Toky jsou identifikovány názvem toku a tento název se používá k identifikaci kontrolních bodů streamování. Použití názvu toku k identifikaci kontrolního bodu znamená následující:
    • Pokud se existující tok v kanálu přejmenuje, kontrolní bod se nepřenese a přejmenovaný tok je v podstatě zcela novým tokem.
    • Název toku v kanálu nemůžete znovu použít, protože stávající kontrolní bod neodpovídá nové definici toku.

Typy toků

Výchozí toky pro streamované tabulky a materializovaná zobrazení jsou toky přírůstků. Můžete také vytvořit toky pro čtení z change data capture datových zdrojů. Následující tabulka popisuje různé typy toků.

Typ toku Description
Append Přidávací toky jsou nejběžnějším typem toku, kde se nové záznamy ve zdroji zapisují do cíle s každou aktualizací. Odpovídají režimu připojení ve strukturovaném streamování. Můžete přidat ONCE příznak, který označuje dávkový dotaz, jehož data by se měla vložit do cíle pouze jednou, pokud se cíl plně neaktualizuje. Libovolný počet přidávacích toků může zapisovat do konkrétního cíle.
Výchozí toky (vytvořené s cílovou tabulkou streamování nebo materializovaným zobrazením) budou mít stejný název jako cíl. Jiné cíle nemají výchozí toky.
Automatické CDC (dříve známé jako použít změny) Automatický tok CDC přijímá dotaz obsahující záznam změn dat (CDC). Automatické toky CDC mohou cílit pouze na streamovací tabulky a zdroj musí být streamovacím zdrojem (i v případě ONCE toků). Několik automatických toků CDC může cílit na jednu streamovací tabulku. Streamovací tabulka, která slouží jako cíl pro automatizovaný tok CDC, může být cílem pouze jiných automatizovaných toků CDC.
Další informace o datech CDC najdete v tématu Automatizovaná rozhraní API CDC: Zjednodušení zachytávání změn dat pomocí kanálů.

Další informace

Další informace o tocích a jejich použití najdete v následujících tématech: