共用方式為


適用於 Microsoft Fabric 數據倉儲的 Spark 連接器

適用於 Fabric 數據倉儲的 Spark 連接器可讓 Spark 開發人員和數據科學家存取和處理來自倉儲和 Lakehouse SQL 分析端點的數據。 連接器提供下列功能:

  • 您可以在相同工作區或跨多個工作區使用來自倉儲或 SQL 分析端點的資料。
  • Lakehouse 的 SQL 分析端點會根據工作區內容自動探索。
  • 連接器具有簡化的Spark API、擷取基礎複雜性,並只使用一行程式碼運作。
  • 當您存取資料表或檢視時,連接器會維護在 SQL 引擎層級定義的安全性模型。 這些模型包括物件層級安全性(OLS)、資料列層級安全性(RLS),以及資料行層級安全性(CLS)。
  • 連接器會在 Fabric 執行時間內預安裝,可免除個別安裝的需求。

驗證

Microsoft Entra 驗證是整合式驗證方法。 使用者登入 Microsoft Fabric 工作區,且其認證會自動傳遞至 SQL 引擎以進行驗證和授權。 驗證憑證會自動映射,使用者不需要提供特定的配置選項。

權限

若要連線到 SQL 引擎,使用者至少需要倉儲或 SQL 分析端點(項目層級)的讀取許可權(類似於 SQL Server 中的 CONNECT 許可權)。 使用者也需要細微的物件層級許可權,才能從特定資料表或檢視表讀取資料。 若要深入瞭解,請參閱 Microsoft Fabric 中的資料倉儲安全性。

程式碼範本和範例

使用方法簽名

下列命令顯示讀取要求的 synapsesql 方法簽名。 需要三部分 tableName 自變數,才能從倉儲和 Lakehouse 的 SQL 分析端點存取資料表或檢視。 根據您的案例,以下列名稱更新引數:

  • 第 1 部分:倉庫或資料湖庫的名稱。
  • 第 2 部分:結構描述的名稱。
  • 第 3 部分:資料表或檢視表的名稱。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame

除了直接從資料表或檢視讀取之外,此連接器也可讓您指定自定義或傳遞查詢,此查詢會傳遞至 SQL 引擎,並將結果傳回 Spark。

spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame

雖然此連接器會自動探索指定之倉儲/Lakehouse 的端點,但如果您想要明確指定,您可以執行此動作。

//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>") 

讀取相同工作區中的資料

重要

在筆記本開頭或在開始使用連接器之前,執行這些匯入語句:

import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants  

下列程式碼是從 Spark DataFrame 中的資料表或檢視讀取資料的範例:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")

以下程式碼是一個範例,展示如何從 Spark DataFrame 中讀取資料表或檢視的資料,並限制至 10 列:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)

下列程式碼是套用篩選條件之後,從 Spark DataFrame 中的資料表或檢視讀取資料的範例:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")

僅針對選定的資料行,以下程式碼提供了從 Spark DataFrame 中表或視圖讀取資料的範例:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")

跨工作區讀取資料

若要跨工作區存取和讀取倉儲或 Lakehouse 的數據,您可以先指定倉儲或 Lakehouse 所在的工作區標識符,並再指定倉儲或 Lakehouse 專案標識符。 下列這一行提供了一個範例,說明如何在指定的工作區 ID 和湖倉/倉儲 ID 下,從倉儲或湖倉中的 Spark 數據框架讀取資料表或檢視資料。

# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")

# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")

注意

當您運行筆記本時,連接器預設會在附加於筆記本的湖倉工作區中尋找指定的倉儲或湖倉。 若要從另一個工作區參考倉庫或 Lakehouse,請指定工作區識別碼和 Lakehouse 或倉庫項目識別碼,如上所示。

根據倉儲的資料建立 Lakehouse 資料表

這些程式代碼行提供從 Spark DataFrame 中的數據表或檢視讀取數據的範例,並用它來建立 Lakehouse 數據表:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")

將 Spark 數據框架數據寫入倉儲數據表

此連接器會採用 Fabric DW 資料表的兩階段寫入過程。 一開始,它會將Spark資料框架數據分階段至中繼記憶體,然後使用 COPY INTO 命令將數據內嵌至 Fabric DW 資料表。 這種方法可確保數據量增加的延展性。

支援的 DataFrame 儲存模式

將數據框架的源數據寫入倉儲中的目的地資料表時,支援下列儲存模式:

  • ErrorIfExists (預設儲存模式):如果目的地數據表存在,則寫入會中止,並回傳例外給呼叫者。 否則,系統會使用數據建立新的數據表。
  • 忽略:如果目標資料表存在,則寫入操作會忽略寫入要求,而不會傳回錯誤。 否則,系統會使用數據建立新的數據表。
  • 覆寫:如果目的地數據表存在,則目的地中的現有數據會取代為數據。 否則,系統會使用數據建立新的數據表。
  • 附加:如果目的地數據表存在,則會將新數據附加至該數據表。 否則,系統會使用數據建立新的數據表。

下列程式代碼顯示將 Spark 資料框架的數據寫入 Fabric DW 資料表的範例:

df.write.synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>") # this uses default mode - errorifexists

df.write.mode("errorifexists").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("ignore").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("append").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("overwrite").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")

注意

連接器僅支援寫入 Fabric DW 數據表,因為 Lakehouse 的 SQL 分析端點是唯讀的。

疑難排解

完成時,讀取回應片段會出現在儲存格的輸出中。 當前儲存格中的故障也會取消筆記本之後的儲存格執行。 Spark 應用程式記錄中提供詳細的錯誤資訊。

使用此連接器的考慮

目前,連接器:

  • 支援從 Lakehouse 項目的 Fabric 倉儲和 SQL 分析端點擷取或讀取數據。
  • 支援使用不同的儲存模式將數據寫入倉儲數據表 - 這僅適用於最新的 GA 運行時間,也就是 Runtime 1.3。 此外,目前當Private Link被啟用且Public Access遭到封鎖時,寫入作業無法運作。
  • Fabric DW 現在支援 Time Travel,但此連接器不適用於具有時間旅行語法的查詢。
  • 保留使用簽章,就像 Apache Spark for Azure Synapse Analytics 隨附的簽章一樣,以保持一致性。 不過,與 Azure Synapse Analytics 中的專用 SQL 集區進行連接和操作並不向後相容。
  • 在提交查詢之前,系統會根據三部分的資料表或檢視名稱,為具有特殊字元的資料行名稱新增逸出字元來處理。 如果是以自定義或傳遞查詢為基礎的讀取,使用者必須逸出包含特殊字元的資料行名稱。