本快速入門說明如何建立一個包含 Spark 結構化串流的 Python 程式碼的 Spark 作業定義,將資料置入 Lakehouse,然後透過 SQL 分析端點提供資料。 完成本快速入門之後,您將擁有持續執行的 Spark 作業定義,而 SQL 分析端點可以檢視傳入的資料。
建立 Python 指令碼
使用下列 Python 腳本,使用 Apache Spark 在湖屋中建立串流 Delta 資料表。 指令碼會讀取產生的資料串流 (每秒一列),並以附加模式將它寫入名為 streamingtable的 Delta 資料表。 它會將資料和檢查點資訊儲存在指定的湖倉中。
使用下列 Python 程式碼,使用 Spark 結構化串流來取得湖屋資料表中的資料。
from pyspark.sql import SparkSession if __name__ == "__main__": # Start Spark session spark = SparkSession.builder \ .appName("RateStreamToDelta") \ .getOrCreate() # Table name used for logging tableName = "streamingtable" # Define Delta Lake storage path deltaTablePath = f"Tables/{tableName}" # Create a streaming DataFrame using the rate source df = spark.readStream \ .format("rate") \ .option("rowsPerSecond", 1) \ .load() # Write the streaming data to Delta query = df.writeStream \ .format("delta") \ .outputMode("append") \ .option("path", deltaTablePath) \ .option("checkpointLocation", f"{deltaTablePath}/_checkpoint") \ .start() # Keep the stream running query.awaitTermination()將腳本儲存為 Python 檔案 (.py) 在本機電腦中。
建立湖倉
使用下列步驟來建立湖庫:
流覽至您想要的工作區,或視需要建立新的工作區。
若要建立 Lakehouse,請從工作區選取 [新增專案],然後在開啟的面板中選取 [Lakehouse]。
輸入您的 Lakehouse 名稱,然後選取 [建立]。
建立 Spark 任務定義
使用下列步驟來建立 Spark 任務定義:
從您建立湖倉的相同工作區中,選取 新增項目。
在開啟的面板中,在 [ 取得資料] 底下,選取 [Spark 作業定義]。
輸入 Spark 作業定義的名稱,然後選取 [建立]。
選取 [上傳] ,然後選取您在上一個步驟中建立的 Python 檔案。
在 [湖屋參考] 底下,選擇您建立的湖屋。
設定 Spark 作業定義的重試原則
使用下列步驟來設定 Spark 任務定義的重試原則:
從頂端功能表中,選取 [設定] 圖示。
開啟[最佳化]索引標籤,然後將[重試原則]觸發設為開啟。
定義重試嘗試次數上限,或核取 [允許無限次嘗試]。
指定每次重試嘗試之間的時間,然後選取 [ 套用]。
Note
重試策略設置的生命周期限制為 90 天。 啟用重試原則之後,作業會在 90 天內根據原則重新啟動。 在此期間之後,重試原則將自動停止運作,且作業將會終止。 然後,使用者必須手動重新啟動作業,這反過來會重新啟用重試原則。
執行和監視 Spark 任務定義
使用 SQL 分析端點檢視資料
腳本執行之後,會在湖屋中建立名為 streamingtable 的數據表,其中包含 時間戳記 和 值 資料行。 您可以使用 SQL 分析端點來檢視資料:
從工作區開啟您的 Lakehouse。
從右上角切換至 SQL 分析端點 。
從左側導覽窗格中,展開 [結構描述 > dbo >資料表],選取 [ streamingtable ] 以預覽資料。