适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器是一种高性能连接器,可用于在大数据分析中使用事务数据,并为即席查询或报告保留结果。 连接器允许将任何 SQL 数据库(本地或云中)用作 Spark 作业的输入数据源或输出数据接收器。
注释
当前此连接器不在主动维护中。 本文仅用于存档目的。
此库包含适用于 SQL Server 和 Azure SQL 平台的 Apache Spark 连接器的源代码。
Apache Spark 是用于大规模数据处理的统一分析引擎。
Maven 提供了两个版本的连接器:2.4.x 兼容版本和 3.0.x 兼容版本。 从 maven.org 下载连接器 ,并使用坐标导入连接器:
连接器 | Maven 坐标 |
---|---|
Spark 2.4.x 兼容连接器 | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Spark 3.0.x 兼容连接器 | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Spark 3.1.x 兼容连接器 | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
还可以从源生成连接器,也可以从 GitHub 中的“发布”部分下载 jar。 有关连接器的最新信息,请参阅 SQL Spark 连接器 GitHub 存储库。
支持的功能
- 支持所有 Spark 绑定(Scala、Python、R)
- 基本身份验证和 Active Directory (AD) 密钥选项卡支持
- 重排
dataframe
写入支持 - 支持写入 SQL Server 大数据群集中的 SQL Server 单一实例和数据池
- Sql Server 单实例的可靠连接器支持
组件 | 支持的版本 |
---|---|
Apache Spark | 2.4.x、3.0.x、3.1.x |
Scala(编程语言) | 2.11, 2.12 |
用于 SQL Server 的 Microsoft JDBC 驱动程序 | 8.4 |
Microsoft SQL Server | SQL Server 2008 或更高版本 |
Azure SQL 数据库 | 已支持 |
支持的选项
适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器支持此处定义的选项: SQL DataSource JDBC
此外,还支持以下选项:
选项 | 违约 | DESCRIPTION |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT 或 NO_DUPLICATES 。
NO_DUPLICATES 在执行程序重启情况下实现了可靠的插入 |
dataPoolDataSource |
none |
none 表示未设置值,连接器应写入 SQL Server 单实例。 将此值设置为数据源名称以在大数据群集中写入数据池表 |
isolationLevel |
READ_COMMITTED |
指定 隔离级别 |
tableLock |
false |
使用 TABLOCK 选项进行插入操作,以提高写入性能。 |
schemaCheckEnabled |
true |
在设置为 false 时禁用严格的数据帧和 SQL 表架构检查。 |
其他 大容量复制选项 可以作为选项来设置,并将在写入过程中传递给 dataframe
API。
性能比较
适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器比用于写入 SQL Server 的通用 JDBC 连接器快 15 倍。 性能特性因类型、数据量和使用的选项而异,并可能在不同运行中显示差异。 以下性能结果是在 Spark dataframe
中使用 143.9M 行更新 SQL 表所需的时间。 Spark 是通过读取dataframe
使用 spark store_sales
TPCDS 基准生成的 HDFS 表构造的。 将读取 store_sales
至 dataframe
的时间排除在外。 结果是基于三次运行的平均值。
连接器类型 | 选项 | DESCRIPTION | 写作时间 |
---|---|---|---|
JDBCConnector |
违约 | 具有默认选项的通用 JDBC 连接器 | 1,385 秒 |
sql-spark-connector |
BEST_EFFORT |
使用默认选项尽最大努力sql-spark-connector |
580 秒 |
sql-spark-connector |
NO_DUPLICATES |
可靠 sql-spark-connector |
709 秒 |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
sql-spark-connector 启用表锁的最佳做法 |
72 秒 |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
启用表锁后的可靠性sql-spark-connector |
198 秒 |
配置
- Spark 配置:num_executors = 20,executor_memory = '1664 MB',executor_cores = 2
- Data Gen 配置:scale_factor = 50,partitioned_tables = true
- 包含 n 个行的数据文件
store_sales
143,997,590
环境
- SQL Server 大数据群集 CU5
-
master
+ 6 个节点 - 每个节点第 5 代服务器,512 GB Ram,每个节点 4 TB NVM,NIC 10 GB
常见问题
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
使用较旧版本的 mssql 驱动程序(现在包含在此连接器中)在 hadoop 环境中出现此问题。 如果您使用以前的 Azure SQL 连接器,并为了 Microsoft Entra 身份验证的兼容性手动将驱动程序安装到该群集上,则需要删除这些驱动程序。
修复问题的步骤:
如果使用泛型 Hadoop 环境,请检查并删除 mssql jar:
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
。 如果使用 Databricks,请添加全局脚本或群集 init 脚本以从/databricks/jars
文件夹中删除旧版本的 mssql 驱动程序,或将此行添加到现有脚本:rm /databricks/jars/*mssql*
添加
adal4j
和mssql
包。 例如,可以使用 Maven,但任何方法都应该可以使用。谨慎
不要以这种方式安装 SQL spark 连接器。
将驱动程序类添加到连接配置。 例如:
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
有关更多信息和说明,请参阅决议 https://github.com/microsoft/sql-spark-connector/issues/26。
入门
适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器基于 Spark DataSourceV1 API 和 SQL Server 批量 API,并使用与内置 JDBC Spark-SQL 连接器相同的接口。 通过此集成,只需使用 更新格式参数 com.microsoft.sqlserver.jdbc.spark
即可轻松集成连接器并迁移现有 Spark 作业。
若要在项目中包括连接器,请下载此存储库并使用 SBT 生成 jar。
写入新的 SQL 表
警告
默认情况下,如果表已存在于数据库中,则模式 overwrite
首先删除该表。 请谨慎使用此选项以避免意外数据丢失。
如果使用模式 overwrite
时,如果不使用用于重新创建表的选项 truncate
,索引将丢失。 列存储表将成为一个堆。 如果要维护现有索引,请同时指定值为 true 的选项 truncate
。 例如,.option("truncate","true")
。
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
追加到 SQL 表
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
指定隔离级别
在数据库执行大容量插入时,此连接器默认使用 READ_COMMITTED
隔离级别。 如果要覆盖隔离级别,请使用 mssqlIsolationLevel
选项。
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
从 SQL 表读取
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Microsoft Entra 身份验证
包含服务主体的 Python 示例
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
包含 Active Directory 密码的 Python 示例
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
必须安装必需的依赖项,才能使用 Active Directory 进行身份验证。
使用 ActiveDirectoryPassword 时,user
的格式应为 UPN 格式,例如 username@domainname.com
。
对于 Scala,需要安装_com.microsoft.aad.adal4j_
构件。
对于 Python,_adal_
需要安装库。 这可通过 pip 获得。
查看 示例笔记本 以获取示例。
支持
用于 Azure SQL 和 SQL Server 的 Apache Spark 连接器是一个开源项目。 此连接器没有任何Microsoft支持。 有关连接器的疑问或问题,请在此项目存储库中创建问题。 连接器社区处于活动状态,并对提交情况进行监视。