搭配串流資料使用 Delta Lake

已完成

我們目前探索的所有資料都是檔案中的靜態資料。 但許多資料分析案例涉及的串流資料必須近即時處理。 例如,您可能需要擷取物聯網 (IoT) 裝置發出的讀取,同時儲存在資料表中儲存這些讀取。

Spark 結構化串流

典型的串流處理解決方案涉及持續從來源讀取資料流、選擇性處理資料流並選取特定欄位、彙總並為值分組,或操作資料並將結果寫入接收

Spark 包含原生支援 Spark 結構化串流的串流資料,即以無限資料框架為基礎的 API,在處理期間此 API 會擷取串流資料。 Spark 結構化串流資料框架可以從許多不同的串流來源讀取資料,包括網路連接埠、即時訊息代理服務,例如 Azure 事件中樞或 Kafka 或檔案系統位置。

提示

如需 Spark 結構化串流的詳細資訊,請參閱 Spark 文件中的「結構化串流程式設計指南」。

使用 Delta Lake 資料表串流

您可以使用 Delta Lake 資料表作為 Spark 結構化串流的來源或接收。 例如,您可以從 IoT 裝置擷取即時的資料串流,並將資料流直接寫入 Delta Lake 資料表作為接收,讓您可以查詢資料表,並查看最新的串流資料。 或者,Delta 資料表可作為串流來源讀取,讓您新增資料至資料表後,可以持續報告新資料。

使用 Delta Lake 資料表作為串流來源

在下列 PySpark 範例中,Delta Lake 資料表會用來儲存網際網路銷售訂單的詳細資料。 附加新資料時,建立的資料流會從 Delta Lake 資料表資料夾讀取資料。

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.show()

注意

使用 Delta Lake 資料表作為串流來源時,資料流中只會包含附加作業。 除非您指定 ignoreChangesignoreDeletes 選項,否則資料修改會發生錯誤。

從 Delta Lake 資料表將資料讀取至串流資料框架後,即可使用 Spark 結構化串流 API 處理資料。 在上述範例中,僅顯示資料框架,但您可以使用 Spark 結構化串流在時態性視窗彙總資料 (例如計算每分鐘下單的數目),並傳送彙總結果至下游程序,取得近即時的視覺效果。

使用 Delta Lake 資料表作為串流接收

在下列 PySpark 範例中,資料流是從資料夾中的 JSON 檔案讀取。 每個檔案中的 JSON 資料都包含 {"device":"Dev1","status":"ok"} 格式的 IoT 裝置狀態,新增檔案至資料夾時,新資料會新增至資料流。 輸入資料流是無限資料框架,後續會以差異格式寫入 Delta Lake 資料表的資料夾位置。

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
streamFolder = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

注意

checkpointLocation 選項會用來寫入追蹤資料處理狀態的檢查點檔案。 此檔案讓您可以從串流處理中止時復原失敗。

啟動串流程序後,您可以查詢目前串流輸出寫入的 Delta Lake 資料表,並查看最新資料。 例如,下列程式碼會建立並查詢 Delta Lake 資料表資料夾的目錄資料表:

%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

若要停止將資料流寫入 Delta Lake 資料表,請使用串流查詢的 stop 方法:

delta_stream.stop()

提示

如需使用 Delta Lake 資料表串流資料的詳細資訊,請參閱 Delta Lake 文件中的「資料表串流讀取和寫入 」。