共用方式為


使用 Delta Live Tables 流程以累加方式載入和處理數據

本文說明什麼是流程,以及如何使用 Delta Live Tables 管線中的流程,以累加方式將數據從來源處理到目標串流數據表。 在 Delta 實時數據表中,流程會以兩種方式定義:

  1. 當您建立更新串流數據表的查詢時,會自動定義流程。
  2. Delta Live Tables 也提供功能來明確定義流程,以進行更複雜的處理,例如從多個串流來源附加至串流數據表。

本文討論當您定義查詢以更新串流數據表時所建立的隱含流程,然後提供語法的詳細數據來定義更複雜的流程。

什麼是流程?

在 Delta Live Tables 中,流程是串流查詢,可累加處理源數據以更新目標串流數據表。 您在管線中建立的大部分差異實時數據表數據集都會將流程定義為查詢的一部分,而且不需要明確定義流程。 例如,您會在單一 DDL 命令的 Delta Live Tables 中建立串流數據表,而不是使用個別的數據表和流程語句來建立串流數據表:

注意

CREATE FLOW 範例僅供說明之用,並包含無效 Delta Live Tables 語法的關鍵詞。

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

除了查詢所定義的預設流程之外,Delta Live Tables Python 和 SQL 介面還提供 附加流程 功能。 附加流程支援需要從多個串流來源讀取數據的處理,以更新單一串流數據表。 例如,當您有現有的串流數據表和流程,而且想要新增寫入此現有串流數據表的串流來源時,您可以使用附加流程功能。

使用附加流程從多個來源數據流寫入串流數據表

注意

若要使用附加流程處理,您的管線必須設定為使用 預覽通道

@append_flow使用 Python 介面中的裝飾專案,或 CREATE FLOW SQL 介面中的 子句,從多個串流來源寫入串流數據表。 使用附加流程來處理工作,例如:

  • 新增串流來源,以將數據附加至現有的串流數據表,而不需要完整重新整理。 例如,您可能會有一個數據表,結合您操作的每個區域的區域數據。 隨著新區域推出,您可以將新的區域數據新增至數據表,而不需要執行完整重新整理。 請參閱 範例:從多個 Kafka 主題寫入串流數據表。
  • 藉由附加遺漏的歷程記錄數據來更新串流數據表(回填)。 例如,您有 Apache Kafka 主題寫入的現有串流數據表。 您也會將歷程記錄數據儲存在數據表中,而您需要將一次剛好插入串流數據表中,而且您無法串流數據,因為您的處理包括在插入數據之前執行複雜的匯總。 請參閱 範例:執行一次性數據回填
  • 結合多個來源的數據並寫入至單一串流數據表,而不是在 UNION 查詢中使用 子句。 使用附加流程處理,而不是 UNION 可讓您以累加方式更新目標數據表,而不需要執行 完整重新整理更新。 請參閱 範例:使用附加流程處理,而不是 UNION

附加流程處理所輸出記錄的目標可以是現有的數據表或新的數據表。 針對 Python 查詢,請使用 create_streaming_table() 函式來建立目標數據表。

重要

  • 如果您需要使用 預期來定義資料品質條件約束,請在目標數據表上定義預期做為函式的 create_streaming_table() 一部分或現有的數據表定義。 您無法在 @append_flow 定義中定義預期。
  • 流程是由流程名稱識別,而此名稱用來識別串流檢查點。 使用流程名稱來識別檢查點表示下列各項:
    • 如果管線中的現有流程已重新命名,檢查點不會延續,且重新命名的流程實際上是全新的流程。
    • 您無法重複使用管線中的流程名稱,因為現有的檢查點不符合新的流程定義。

以下是 的 @append_flow語法:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

範例:從多個 Kafka 主題寫入串流數據表

下列範例會建立名為 kafka_target 的串流數據表,並從兩個 Kafka 主題寫入該串流數據表:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

若要深入瞭解 read_kafka() SQL 查詢中使用的數據表值函式,請參閱 SQL 語言參考中的read_kafka

範例:執行一次性數據回填

下列範例會執行查詢,將歷程記錄數據附加至串流數據表:

注意

若要確保當回填查詢是排程或持續執行的管線一部分時,請移除執行管線一次之後的查詢。 若要在到達回填目錄時附加新數據,請將查詢保留原位。

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

範例:使用附加流程處理,而不是 UNION

您可以使用附加流程查詢來結合多個來源,並寫入至單一串流數據表,而不是搭配 UNION 子句使用查詢。 使用附加流程查詢,UNION而不是可讓您從多個來源附加至串流數據表,而不需要執行完整重新整理

下列 Python 範例包含結合多個數據源與 UNION 子句的查詢:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

下列範例會將 UNION 查詢取代為附加流程查詢:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );