Přírůstkové načítání a zpracování dat pomocí toků Delta Live Tables
Tento článek vysvětluje, co jsou toky a jak můžete pomocí toků v kanálech Delta Live Tables přírůstkově zpracovávat data ze zdroje do cílové streamované tabulky. V Delta Live Tables jsou toky definovány dvěma způsoby:
- Tok se definuje automaticky při vytváření dotazu, který aktualizuje tabulku streamování.
- Delta Live Tables také poskytuje funkce pro explicitní definování toků pro složitější zpracování, jako je připojení ke streamované tabulce z více streamovaných zdrojů.
Tento článek popisuje implicitní toky, které se vytvoří při definování dotazu pro aktualizaci streamované tabulky, a poté poskytuje podrobnosti o syntaxi pro definování složitějších toků.
Co je tok?
V Delta Live Tables je tok streamovací dotaz, který zpracovává zdrojová data přírůstkově za účelem aktualizace cílové streamovací tabulky. Většina datových sad Delta Live Tables, které vytvoříte v kanálu, definuje tok jako součást dotazu a nevyžaduje explicitní definování toku. Například vytvoříte streamovací tabulku v rozdílových živých tabulkách v jednom příkazu DDL místo použití samostatných příkazů tabulky a toku k vytvoření tabulky streamování:
Poznámka:
Tento CREATE FLOW
příklad je k dispozici pouze pro ilustrativní účely a obsahuje klíčová slova, která nejsou platná syntaxe dynamických tabulek Delta.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Kromě výchozího toku definovaného dotazem poskytují rozhraní Delta Live Tables Python a SQL funkci přidávacího toku . Připojit tok podporuje zpracování, které vyžaduje čtení dat z více streamovaných zdrojů, aby se aktualizovala jedna streamovací tabulka. Funkci toku připojení můžete použít například v případě, že máte existující streamovací tabulku a tok a chcete přidat nový zdroj streamování, který zapisuje do této existující streamovací tabulky.
Použití přidávacího toku k zápisu do streamované tabulky z více zdrojových datových proudů
Poznámka:
Pokud chcete použít zpracování toku připojení, musí být váš kanál nakonfigurovaný tak, aby používal kanál Preview.
@append_flow
Pomocí dekorátoru v rozhraní Pythonu nebo CREATE FLOW
klauzule v rozhraní SQL můžete zapisovat do streamované tabulky z více zdrojů streamování. Použijte tok připojení ke zpracování úloh, jako je například následující:
- Přidejte streamované zdroje, které připojují data k existující streamované tabulce bez nutnosti úplné aktualizace. Můžete mít například tabulku, která kombinuje regionální data z každé oblasti, ve které pracujete. Při nasazení nových oblastí můžete do tabulky přidat nová data oblasti, aniž byste provedli úplnou aktualizaci. Viz příklad: Zápis do streamované tabulky z několika témat Kafka.
- Aktualizujte streamovací tabulku přidáním chybějících historických dat (obnovení). Máte například existující streamovací tabulku napsanou tématem Apache Kafka. Máte také historická data uložená v tabulce, kterou potřebujete vložit přesně jednou do streamované tabulky, a nemůžete je streamovat, protože zpracování zahrnuje provedení komplexní agregace před vložením dat. Viz příklad: Spuštění jednorázového obnovení dat
- Kombinovat data z více zdrojů a zapisovat do jedné streamovací tabulky místo použití
UNION
klauzule v dotazu. Pomocí zpracování toku připojení místoUNION
toho můžete aktualizovat cílovou tabulku přírůstkově bez spuštění úplné aktualizace. Viz příklad: Místo funkce UNION použijte zpracování toku připojení.
Cílem výstupu záznamů při zpracování toku připojení může být existující tabulka nebo nová tabulka. V případě dotazů v Pythonu vytvořte cílovou tabulku pomocí funkce create_streaming_table( ).
Důležité
- Pokud potřebujete definovat omezení kvality dat s očekáváními, definujte očekávání v cílové tabulce jako součást
create_streaming_table()
funkce nebo v existující definici tabulky. V definici@append_flow
nelze definovat očekávání. - Toky jsou identifikovány názvem toku a tento název se používá k identifikaci kontrolních bodů streamování. Použití názvu toku k identifikaci kontrolního bodu znamená následující:
- Pokud se existující tok v kanálu přejmenuje, kontrolní bod se nepřenese a přejmenovaný tok je v podstatě zcela novým tokem.
- Název toku v kanálu nemůžete znovu použít, protože stávající kontrolní bod neodpovídá nové definici toku.
Následuje syntaxe pro @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Příklad: Zápis do streamované tabulky z několika témat Kafka
Následující příklady vytvoří tabulku streamování s názvem kafka_target
a zápisem do této tabulky streamování ze dvou témat Kafka:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Další informace o read_kafka()
funkci s hodnotou tabulky, která se používá v dotazech SQL, najdete v read_kafka v referenční dokumentaci jazyka SQL.
Příklad: Spuštění jednorázového obnovení dat
Následující příklady spustí dotaz, který připojí historická data do streamované tabulky:
Poznámka:
Pokud chcete zajistit skutečné jednorázové obnovení, pokud je dotaz backfill součástí kanálu, který běží podle plánu nebo nepřetržitě, odeberte dotaz po spuštění kanálu jednou. Pokud chcete přidat nová data, pokud dorazí do adresáře backfill, ponechte dotaz na místě.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Příklad: Použití zpracování toku připojení místo UNION
Místo použití dotazu s UNION
klauzulí můžete pomocí přidávacích dotazů toku kombinovat více zdrojů a zapisovat do jedné streamovací tabulky. Použití dotazů přidávacího toku místo UNION
toho umožňuje připojit se k streamované tabulce z více zdrojů bez spuštění úplné aktualizace.
Následující příklad Pythonu obsahuje dotaz, který kombinuje více zdrojů dat s klauzulí UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Následující příklady nahrazují UNION
dotaz přidávacími dotazy toku:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);