Apache Spark 连接器:SQL Server 和 Azure SQL

适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器是一种高性能连接器,可用于在大数据分析中使用事务数据,并为即席查询或报告保留结果。 连接器允许将任何 SQL 数据库(本地或云中)用作 Spark 作业的输入数据源或输出数据接收器。

注释

当前此连接器不在主动维护中。 本文仅用于存档目的。

此库包含适用于 SQL Server 和 Azure SQL 平台的 Apache Spark 连接器的源代码。

Apache Spark 是用于大规模数据处理的统一分析引擎。

Maven 提供了两个版本的连接器:2.4.x 兼容版本和 3.0.x 兼容版本。 从 maven.org 下载连接器 ,并使用坐标导入连接器:

连接器 Maven 坐标
Spark 2.4.x 兼容连接器 com.microsoft.azure:spark-mssql-connector:1.0.2
Spark 3.0.x 兼容连接器 com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Spark 3.1.x 兼容连接器 com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

还可以从源生成连接器,也可以从 GitHub 中的“发布”部分下载 jar。 有关连接器的最新信息,请参阅 SQL Spark 连接器 GitHub 存储库

支持的功能

  • 支持所有 Spark 绑定(Scala、Python、R)
  • 基本身份验证和 Active Directory (AD) 密钥选项卡支持
  • 重排 dataframe 写入支持
  • 支持写入 SQL Server 大数据群集中的 SQL Server 单一实例和数据池
  • Sql Server 单实例的可靠连接器支持
组件 支持的版本
Apache Spark 2.4.x、3.0.x、3.1.x
Scala(编程语言) 2.11, 2.12
用于 SQL Server 的 Microsoft JDBC 驱动程序 8.4
Microsoft SQL Server SQL Server 2008 或更高版本
Azure SQL 数据库 已支持

支持的选项

适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器支持此处定义的选项: SQL DataSource JDBC

此外,还支持以下选项:

选项 违约 DESCRIPTION
reliabilityLevel BEST_EFFORT BEST_EFFORTNO_DUPLICATESNO_DUPLICATES 在执行程序重启情况下实现了可靠的插入
dataPoolDataSource none none 表示未设置值,连接器应写入 SQL Server 单实例。 将此值设置为数据源名称以在大数据群集中写入数据池表
isolationLevel READ_COMMITTED 指定 隔离级别
tableLock false 使用 TABLOCK 选项进行插入操作,以提高写入性能。
schemaCheckEnabled true 在设置为 false 时禁用严格的数据帧和 SQL 表架构检查。

其他 大容量复制选项 可以作为选项来设置,并将在写入过程中传递给 dataframe API。

性能比较

适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器比用于写入 SQL Server 的通用 JDBC 连接器快 15 倍。 性能特性因类型、数据量和使用的选项而异,并可能在不同运行中显示差异。 以下性能结果是在 Spark dataframe中使用 143.9M 行更新 SQL 表所需的时间。 Spark 是通过读取dataframe使用 spark store_salesTPCDS 基准生成的 HDFS 表构造的。 将读取 store_salesdataframe 的时间排除在外。 结果是基于三次运行的平均值。

连接器类型 选项 DESCRIPTION 写作时间
JDBCConnector 违约 具有默认选项的通用 JDBC 连接器 1,385 秒
sql-spark-connector BEST_EFFORT 使用默认选项尽最大努力sql-spark-connector 580 秒
sql-spark-connector NO_DUPLICATES 可靠 sql-spark-connector 709 秒
sql-spark-connector BEST_EFFORT + tabLock=true sql-spark-connector启用表锁的最佳做法 72 秒
sql-spark-connector NO_DUPLICATES + tabLock=true 启用表锁后的可靠性sql-spark-connector 198 秒

配置

  • Spark 配置:num_executors = 20,executor_memory = '1664 MB',executor_cores = 2
  • Data Gen 配置:scale_factor = 50,partitioned_tables = true
  • 包含 n 个行的数据文件 store_sales 143,997,590

环境

  • SQL Server 大数据群集 CU5
  • master + 6 个节点
  • 每个节点第 5 代服务器,512 GB Ram,每个节点 4 TB NVM,NIC 10 GB

常见问题

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

使用较旧版本的 mssql 驱动程序(现在包含在此连接器中)在 hadoop 环境中出现此问题。 如果您使用以前的 Azure SQL 连接器,并为了 Microsoft Entra 身份验证的兼容性手动将驱动程序安装到该群集上,则需要删除这些驱动程序。

修复问题的步骤:

  1. 如果使用泛型 Hadoop 环境,请检查并删除 mssql jar: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar。 如果使用 Databricks,请添加全局脚本或群集 init 脚本以从 /databricks/jars 文件夹中删除旧版本的 mssql 驱动程序,或将此行添加到现有脚本: rm /databricks/jars/*mssql*

  2. 添加adal4jmssql包。 例如,可以使用 Maven,但任何方法都应该可以使用。

    谨慎

    不要以这种方式安装 SQL spark 连接器。

  3. 将驱动程序类添加到连接配置。 例如:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

有关更多信息和说明,请参阅决议 https://github.com/microsoft/sql-spark-connector/issues/26

入门

适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器基于 Spark DataSourceV1 API 和 SQL Server 批量 API,并使用与内置 JDBC Spark-SQL 连接器相同的接口。 通过此集成,只需使用 更新格式参数 com.microsoft.sqlserver.jdbc.spark即可轻松集成连接器并迁移现有 Spark 作业。

若要在项目中包括连接器,请下载此存储库并使用 SBT 生成 jar。

写入新的 SQL 表

警告

默认情况下,如果表已存在于数据库中,则模式 overwrite 首先删除该表。 请谨慎使用此选项以避免意外数据丢失。

如果使用模式 overwrite 时,如果不使用用于重新创建表的选项 truncate ,索引将丢失。 列存储表将成为一个堆。 如果要维护现有索引,请同时指定值为 true 的选项 truncate 。 例如,.option("truncate","true")

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

追加到 SQL 表

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

指定隔离级别

在数据库执行大容量插入时,此连接器默认使用 READ_COMMITTED 隔离级别。 如果要覆盖隔离级别,请使用 mssqlIsolationLevel 选项。

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

从 SQL 表读取

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra 身份验证

包含服务主体的 Python 示例

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

包含 Active Directory 密码的 Python 示例

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

必须安装必需的依赖项,才能使用 Active Directory 进行身份验证。

使用 ActiveDirectoryPassword 时,user 的格式应为 UPN 格式,例如 username@domainname.com

对于 Scala,需要安装_com.microsoft.aad.adal4j_构件。

对于 Python,_adal_需要安装库。 这可通过 pip 获得。

查看 示例笔记本 以获取示例。

支持

用于 Azure SQL 和 SQL Server 的 Apache Spark 连接器是一个开源项目。 此连接器没有任何Microsoft支持。 有关连接器的疑问或问题,请在此项目存储库中创建问题。 连接器社区处于活动状态,并对提交情况进行监视。

请访问 SQL Spark 连接器 GitHub 存储库