共用方式為


串流數據表

串流數據表是 Delta 數據表,可額外支援串流或累加數據處理。 串流資料表可以成為管線中一或多個流的目標。

串流數據表是數據擷取的絕佳選擇,原因如下:

  • 每個輸入數據行只會處理一次,這樣可模擬絕大多數資料擷取工作負載(也就是說,藉由將數據行附加或更新插入到數據表中)。
  • 它們可以處理大量只能附加的資料。

串流數據表也是低延遲串流轉換的絕佳選擇,原因如下:

  • 圍繞數據列和時間窗口進行分析和推理
  • 處理大量數據
  • 低延遲

下圖說明串流數據表的運作方式。

顯示串流數據表如何運作的圖表

在每個更新上,與串流數據表相關聯的流程會讀取串流來源中變更的資訊,並將新資訊附加至該數據表。

串流數據表是由單一管線定義和更新。 您會在管線的原始碼中明確定義串流資料表。 管線所定義的數據表無法由任何其他管線變更或更新。 您可以定義多個流程,以附加至單一串流數據表。

備註

當您使用 Databricks SQL 在管線外部建立串流資料表時,Azure Databricks 會建立用來更新資料表的管線。 您可以從工作區的左側導覽中選取 [作業與管線] 來查看管線。 您可以將 [管線類型 ] 資料行新增至檢視。 在管線中定義的串流資料表具有類型 ETL。 在 Databricks SQL 中建立的串流資料表的類型為MV/ST

如需流程的詳細資訊,請參閱 使用 Lakeflow Spark 宣告式管線流程以累加方式載入和處理資料

串流資料表用於資料引入

串流數據表是專為僅附加數據源所設計,且只會處理輸入一次。

下列範例示範如何使用串流數據表從雲端記憶體擷取新的檔案。

Python

from pyspark import pipelines as dp

# create a streaming table
@dp.table
def customers_bronze():
  return (
    spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .option("cloudFiles.inferColumnTypes", "true")
     .load("/Volumes/path/to/files")
  )

若要建立串流資料表,資料集定義必須是串流類型。 當您在資料集定義中使用函 spark.readStream 式時,它會傳回串流資料集。

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
  "/volumes/path/to/files",
  format => "json"
);

串流資料表需要串流資料集。 STREAM關鍵字在read_files之前會告知查詢將資料集視為資料流程。

如需將資料載入串流資料表的詳細資訊,請參閱 在管線中載入資料。

下圖表說明僅附加串流資料表的工作原理。

顯示僅附加串流數據表如何運作的圖表

已附加至串流數據表的數據列將不會在稍後更新至管線時重新查詢。 如果您修改查詢(例如,從 SELECT LOWER (name)SELECT UPPER (name)),現有的數據列將不會更新為大寫,但新的數據列會是大寫。 您可以觸發完整刷新,從源資料表重新查詢所有先前資料,以更新串流資料表中的所有資料列。

串流數據表和低延遲串流

串流數據表是針對透過限定狀態進行低延遲串流所設計。 串流數據表會使用檢查點管理,使其非常適合用於低延遲串流。 不過,它們預期資料流是自然界限或受限於水印。

自然系結數據流是由具有妥善定義開始和結束的串流數據源所產生。 自然系結數據流的範例是從檔案目錄讀取數據,其中在放置初始檔案批次之後,不會新增任何新檔案。 數據流會被視為限定,因為檔案數目有限,然後,數據流會在處理所有檔案之後結束。

您也可以使用水印來綁定資料流。 Spark 結構化串流中的浮水印是一種機制,可藉由指定系統應該等候延遲事件的時間長度,再將時間範圍視為完成,來協助處理延遲數據。 沒有水印的無界限的資料流可能會導致管線因為記憶體壓力而失敗。

如需具狀態串流處理的詳細資訊,請參閱 使用浮水印最佳化有狀態處理

串流快照連結

串流快照聯結是指在數據流啟動時,數據流與快照維度之間的聯結。 如果維度在數據流啟動之後變更,這些聯結不會重新計算,因為維度數據表會被視為時間快照集,而且除非重載或重新整理維度數據表,否則數據流開始之後維度數據表的變更不會反映。 如果您接受聯結中的小差異,這樣的行為是合理的。 例如,當交易數目的大小比客戶數目大許多個數量級時,可以接受近似聯結。

在下列程式代碼範例中,我們將一個包含兩條記錄的維度表「客戶」與不斷增加的數據集「交易」進行聯結。 我們將這兩個數據集之間的聯結具體化為名為 sales_report的數據表。 請注意,如果外部進程藉由新增一個新行來更新客戶表(customer_id=3, name=Zoya),這個新行將不會出現在聯接中,因為數據流啟動時,靜態維度表已快照。

from pyspark import pipelines as dp

@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
  return spark.readStream.table("transactions")

# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
  return spark.read.table("customers")

@dp.table
def sales_report():
  facts = spark.readStream.table("v_transactions")
  dims = spark.read.table("v_customers")

  return facts.join(dims, on="customer_id", how="inner")

串流數據表限制

串流資料表有下列限制:

  • 有限的演進: 您可以變更查詢,而不需重新計算整個數據集。 如果沒有完整重新整理,串流資料表只會看到每個資料列一次,因此不同的查詢會處理不同的資料列。 例如,如果您將 UPPER() 新增到查詢的欄位中,則只有在變更後處理的資料列會呈現為大寫形式。 這表示您必須注意在您的資料集上執行的所有先前版本的查詢。 若要重新處理變更之前已處理的現有資料列,需要完整重新整理。
  • 狀態管理: 串流表具有低延遲,因此您必須確保其所運行的串流是自然封閉的或使用水位標記來封閉。 如需詳細資訊,請參閱 使用浮水印最佳化有狀態處理
  • 聯結不會重新計算: 串流數據表中的聯結不會在維度變更時重新計算。 此特性適用於「快速但易錯」的情況。 如果您想要讓檢視一律正確,您可能想要使用具體化檢視。 具體化檢視一律正確,因為它們會在維度變更時自動重新計算聯結。 如需詳細資訊,請參閱 具體化檢視。