Sdílet prostřednictvím


Přírůstkové načítání a zpracování dat pomocí toků Delta Live Tables

Tento článek vysvětluje, co jsou toky a jak můžete pomocí toků v kanálech Delta Live Tables přírůstkově zpracovávat data ze zdroje do cílové streamované tabulky. V Delta Live Tables jsou toky definovány dvěma způsoby:

  1. Tok se definuje automaticky při vytváření dotazu, který aktualizuje tabulku streamování.
  2. Delta Live Tables také poskytuje funkce pro explicitní definování toků pro složitější zpracování, jako je připojení ke streamované tabulce z více streamovaných zdrojů.

Tento článek popisuje implicitní toky, které se vytvoří při definování dotazu pro aktualizaci streamované tabulky, a poté poskytuje podrobnosti o syntaxi pro definování složitějších toků.

Co je tok?

V Delta Live Tables je tok streamovací dotaz, který zpracovává zdrojová data přírůstkově za účelem aktualizace cílové streamovací tabulky. Většina datových sad Delta Live Tables, které vytvoříte v kanálu, definuje tok jako součást dotazu a nevyžaduje explicitní definování toku. Například vytvoříte streamovací tabulku v rozdílových živých tabulkách v jednom příkazu DDL místo použití samostatných příkazů tabulky a toku k vytvoření tabulky streamování:

Poznámka:

Tento CREATE FLOW příklad je k dispozici pouze pro ilustrativní účely a obsahuje klíčová slova, která nejsou platná syntaxe dynamických tabulek Delta.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Kromě výchozího toku definovaného dotazem poskytují rozhraní Delta Live Tables Python a SQL funkci přidávacího toku . Připojit tok podporuje zpracování, které vyžaduje čtení dat z více streamovaných zdrojů, aby se aktualizovala jedna streamovací tabulka. Funkci toku připojení můžete použít například v případě, že máte existující streamovací tabulku a tok a chcete přidat nový zdroj streamování, který zapisuje do této existující streamovací tabulky.

Použití přidávacího toku k zápisu do streamované tabulky z více zdrojových datových proudů

Poznámka:

Pokud chcete použít zpracování toku připojení, musí být váš kanál nakonfigurovaný tak, aby používal kanál Preview.

@append_flow Pomocí dekorátoru v rozhraní Pythonu nebo CREATE FLOW klauzule v rozhraní SQL můžete zapisovat do streamované tabulky z více zdrojů streamování. Použijte tok připojení ke zpracování úloh, jako je například následující:

  • Přidejte streamované zdroje, které připojují data k existující streamované tabulce bez nutnosti úplné aktualizace. Můžete mít například tabulku, která kombinuje regionální data z každé oblasti, ve které pracujete. Při nasazení nových oblastí můžete do tabulky přidat nová data oblasti, aniž byste provedli úplnou aktualizaci. Viz příklad: Zápis do streamované tabulky z několika témat Kafka.
  • Aktualizujte streamovací tabulku přidáním chybějících historických dat (obnovení). Máte například existující streamovací tabulku napsanou tématem Apache Kafka. Máte také historická data uložená v tabulce, kterou potřebujete vložit přesně jednou do streamované tabulky, a nemůžete je streamovat, protože zpracování zahrnuje provedení komplexní agregace před vložením dat. Viz příklad: Spuštění jednorázového obnovení dat
  • Kombinovat data z více zdrojů a zapisovat do jedné streamovací tabulky místo použití UNION klauzule v dotazu. Pomocí zpracování toku připojení místo UNION toho můžete aktualizovat cílovou tabulku přírůstkově bez spuštění úplné aktualizace. Viz příklad: Místo funkce UNION použijte zpracování toku připojení.

Cílem výstupu záznamů při zpracování toku připojení může být existující tabulka nebo nová tabulka. V případě dotazů v Pythonu vytvořte cílovou tabulku pomocí funkce create_streaming_table( ).

Důležité

  • Pokud potřebujete definovat omezení kvality dat s očekáváními, definujte očekávání v cílové tabulce jako součást create_streaming_table() funkce nebo v existující definici tabulky. V definici @append_flow nelze definovat očekávání.
  • Toky jsou identifikovány názvem toku a tento název se používá k identifikaci kontrolních bodů streamování. Použití názvu toku k identifikaci kontrolního bodu znamená následující:
    • Pokud se existující tok v kanálu přejmenuje, kontrolní bod se nepřenese a přejmenovaný tok je v podstatě zcela novým tokem.
    • Název toku v kanálu nemůžete znovu použít, protože stávající kontrolní bod neodpovídá nové definici toku.

Následuje syntaxe pro @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Příklad: Zápis do streamované tabulky z několika témat Kafka

Následující příklady vytvoří tabulku streamování s názvem kafka_target a zápisem do této tabulky streamování ze dvou témat Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Další informace o read_kafka() funkci s hodnotou tabulky, která se používá v dotazech SQL, najdete v read_kafka v referenční dokumentaci jazyka SQL.

Příklad: Spuštění jednorázového obnovení dat

Následující příklady spustí dotaz, který připojí historická data do streamované tabulky:

Poznámka:

Pokud chcete zajistit skutečné jednorázové obnovení, pokud je dotaz backfill součástí kanálu, který běží podle plánu nebo nepřetržitě, odeberte dotaz po spuštění kanálu jednou. Pokud chcete přidat nová data, pokud dorazí do adresáře backfill, ponechte dotaz na místě.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Příklad: Použití zpracování toku připojení místo UNION

Místo použití dotazu s UNION klauzulí můžete pomocí přidávacích dotazů toku kombinovat více zdrojů a zapisovat do jedné streamovací tabulky. Použití dotazů přidávacího toku místo UNION toho umožňuje připojit se k streamované tabulce z více zdrojů bez spuštění úplné aktualizace.

Následující příklad Pythonu obsahuje dotaz, který kombinuje více zdrojů dat s klauzulí UNION :

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Následující příklady nahrazují UNION dotaz přidávacími dotazy toku:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );