适用于 Fabric 数据仓库的 Spark 连接器使 Spark 开发人员和数据科学家能够访问和处理来自仓库和湖屋的 SQL 分析终结点的数据。 该连接器提供以下功能:
- 可以从同一工作区或跨多个工作区处理仓库或 SQL 分析端点中的数据。
- Lakehouse 的 SQL 分析端点根据工作区上下文自动发现。
- 该连接器提供简化的 Spark API,抽象化底层复杂性,并且只需一行代码即可操作。
- 访问表或视图时,该连接器支持在 SQL 引擎级别定义的安全模型。 这些模型包括对象级安全性 (OLS)、行级别安全性 (RLS) 和列级别安全性 (CLS)。
- 该连接器预安装在 Fabric 运行时中,无需单独安装。
身份验证
Microsoft Entra 身份验证是一种集成身份验证方法。 用户登录到 Microsoft Fabric 工作区,其凭证会自动传递到 SQL 引擎进行身份验证和授权。 凭证是自动映射的,用户不需要提供特定的配置选项。
注意
Fabric 数据仓库的 Spark 连接器仅支持交互式Microsoft Entra 用户身份验证。 不支持服务主体身份验证。
权限
要连接到 SQL 引擎,用户至少需要仓库或 SQL 分析端点(项目级别)上的读取权限(类似于 SQL Server 中的 CONNECT 权限)。 用户还需要精细的对象级权限才能从特定表或视图读取数据。 若要了解详细信息,请参阅 Microsoft Fabric 中数据仓库的安全性。
代码模板和示例
使用方法签名
以下命令显示读取请求的 synapsesql 方法签名。 从仓库和湖屋的 SQL 分析终结点访问表或视图需要 tableName 参数(包含三个部分)。 根据你的场景,使用以下名称更新参数:
- 第 1 部分:仓库或湖屋的名称。
- 第 2 部分:架构的名称。
- 第 3 部分:表或视图的名称。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame
除了直接从表或视图中读取,此连接器还允许你指定自定义或传递查询,然后,该查询将传递给 SQL 引擎,结果返回到 Spark。
spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame
虽然此连接器会自动发现指定仓库/湖屋的终结点,但如果要显式指定它,则可以执行此操作。
//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>")
读取同一工作区内的数据
重要
请在笔记本开头或在开始使用连接器之前运行这些导入语句。
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
以下代码是从 Spark 数据帧中的表或视图读取数据的示例:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
以下代码是从行数限制为 10 的 Spark 数据帧中的表或视图读取数据的示例:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)
以下代码是应用筛选条件后从 Spark 数据帧中的表或视图读取数据的示例:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")
以下代码是一个示例,用于从 Spark 数据帧中的表或视图读取选定列的数据:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")
跨工作区读取数据
要跨工作区访问和读取仓库或湖屋中的数据,可以指定仓库或湖屋所在的工作区 ID,然后指定湖屋或仓库项 ID。 以下代码行举例说明如何在具有指定工作区 ID 和湖屋/仓库 ID 的仓库或湖屋中,从 Spark DataFrame 的表或视图中读取数据:
# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
注意
运行笔记本时,连接器默认情况下会在连接到笔记本的湖屋的工作区中查找指定的仓库或湖屋。 要从另一个工作区引用仓库或湖屋,请指定工作区 ID 和湖屋或仓库项 ID,如上所示。
根据仓库中的数据创建湖屋表
这些代码行举例说明如何从 Spark DataFrame 中的表或视图读取数据,并使用它来创建湖屋表:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")
将 Spark 数据帧数据写入数据仓库表
此连接器采用对Fabric DW表的两阶段写入方法。 最初,它将 Spark 数据帧数据暂存到中间存储中,然后使用 COPY INTO 命令将数据引入 Fabric DW 表。 此方法可确保通过增加数据量实现可伸缩性。
支持的数据帧保存模式
将数据帧的源数据写入仓库中的目标表时,支持以下保存模式:
- ErrorIfExists (默认保存模式):如果目标表存在,则会中止写入,并将异常返回给被调用方。 否则,将用数据创建一个新表。
- 忽略:如果目标表存在,写入操作将不处理请求,并且不会返回错误。 否则,将用数据创建一个新表。
- 覆盖:如果目标表存在,则会让数据替换目标中的现有数据。 否则,将用数据创建一个新表。
- 追加:如果目标表存在,则会向其追加新数据。 否则,将用数据创建一个新表。
以下代码演示了将 Spark 数据帧的数据写入 Fabric DW 表的示例:
df.write.synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>") # this uses default mode - errorifexists
df.write.mode("errorifexists").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("ignore").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("append").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("overwrite").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
注意
连接器仅支持写入 Fabric DW 表,因为 Lakehouse 的 SQL 分析终结点是只读的。
故障排除
完成后,读取响应片段显示在单元格的输出中。 当前单元格失败也会取消笔记本的后续单元格执行操作。 Spark 应用程序日志中提供了详细的错误信息。
使用此连接器的注意事项
目前,连接器:
- 支持从 Fabric 仓库和湖屋项的 SQL 分析终结点检索或读取数据。
- 支持使用不同的保存模式将数据写入仓库表 - 这仅适用于最新的 GA 运行时,即 Runtime 1.3。
- 启用
Private Link时,在租户级别不支持写入操作;启用Private Link时,在工作区级别不支持读取和写入操作。 - Fabric DW 现在支持
Time Travel,但此连接器不适用于具有时间旅行语法的查询。 - 保留使用签名(如 Apache Spark for Azure Synapse Analytics 随附的签名)以保持一致性。 但是,连接和使用 Azure Synapse Analytics 专用 SQL 池并非后向兼容。
- 在提交基于表(包含 3 个部分)/视图名称的查询之前,将通过添加转义字符来处理带有特殊字符的列名。 如果是自定义或基于直通查询的读取,用户需要转义包含特殊字符的列名。