使用 Spark 連接器加速即時巨量數據分析

適用於:Azure SQL DatabaseAzure SQL 受控執行個體

注意

自 2020 年 9 月起,此連接器不會主動維護。 不過,現在已提供適用於 SQL Server 和 Azure SQL 的 Apache Spark 連線 or,支援 Python 和 R 系結、更容易使用的介面來大量插入數據,以及其他許多改進功能。 我們強烈建議您評估和使用新的連接器,而不是這個連接器。 舊連接器的相關信息 (此頁面) 只會保留以供封存之用。

Spark 連接器可讓 Azure SQL 資料庫、Azure SQL 受控執行個體 和 SQL Server 中的資料庫作為 Spark 作業的輸入數據源或輸出數據接收。 它可讓您在巨量數據分析中使用即時事務數據,並保存特定查詢或報告的結果。 相較於內建 JDBC 連接器,此連接器可讓您將數據大量插入資料庫。 它可以以 10 倍到 20 倍的速度提升效能,以超過逐列插入效能。 Spark 連接器支援使用 Microsoft Entra 識別符 (先前稱為 Azure Active Directory) 進行驗證,以連線到 Azure SQL 資料庫 和 Azure SQL 受控執行個體,讓您能夠使用 Microsoft Entra 帳戶從 Azure Databricks 聯機資料庫。 它提供內建 JDBC 連接器的類似介面。 您可以輕鬆地移轉現有的 Spark 作業,以使用此新連接器。

注意

Microsoft Entra 標識符 先前稱為 Azure Active Directory (Azure AD)。

下載並建置Spark連接器

先前從此頁面連結至的舊連接器 GitHub 存放庫不會主動維護。 相反地,我們強烈建議您評估和使用 新的連接器

官方支援的版本

元件 版本
Apache Spark 2.0.2 或更新版本
Scala 2.10 或更新版本
Microsoft JDBC Driver for SQL Server 6.2 或更新版本
Microsoft SQL Server SQL Server 2008 或更新版本
Azure SQL Database 支援
Azure SQL 受控執行個體 支援

Spark 連接器會利用 Microsoft JDBC Driver for SQL Server 在 Spark 背景工作節點和資料庫之間行動數據:

資料流如下所示:

  1. Spark 主要節點會連線到 SQL 資料庫 或 SQL Server 中的資料庫,並從特定數據表或使用特定的 SQL 查詢載入數據。
  2. Spark 主要節點會將數據散發至背景工作節點以進行轉換。
  3. 背景工作節點會連線到連接到 SQL 資料庫 和 SQL Server 的資料庫,並將數據寫入資料庫。 用戶可以選擇使用逐列插入或大量插入。

下圖說明數據流。

Diagram shows the described flow, with a master node connecting directly to the database and connecting to three worker nodes, which connect to the database.

建置Spark連接器

連接器專案目前使用 maven。 若要在沒有相依性的情況下建置連接器,您可以執行:

  • mvn 清理套件
  • 從發行資料夾下載最新版的 JAR
  • 包含 SQL 資料庫 Spark JAR

使用 Spark 連接器 連線 和讀取數據

您可以從 Spark 作業連線到 SQL 資料庫 和 SQL Server 中的資料庫,以讀取或寫入數據。 您也可以在 SQL 資料庫 和 SQL Server 的資料庫中執行 DML 或 DDL 查詢。

從 Azure SQL 和 SQL Server 讀取數據

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

使用指定的 SQL 查詢從 Azure SQL 和 SQL Server 讀取數據

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

將數據寫入 Azure SQL 和 SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

在 Azure SQL 和 SQL Server 中執行 DML 或 DDL 查詢

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

使用 Microsoft Entra 驗證從 Spark 連線

您可以使用 Microsoft Entra 驗證連線到 SQL 資料庫 和 SQL 受管理執行個體。 使用 Microsoft Entra 驗證集中管理資料庫使用者的身分識別,以及作為 SQL 驗證的替代方案。

使用 ActiveDirectoryPassword 驗證模式 連線

安裝需求

如果您使用 ActiveDirectoryPassword 驗證模式,您必須下載 microsoft-authentication-library-for-java 及其相依性,並將其包含在 Java 建置路徑中。

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

使用存取令牌 連線

安裝需求

如果您使用存取令牌型驗證模式,您必須下載 microsoft-authentication-library-for-java 及其相依性,並將其包含在 Java 建置路徑中。

請參閱使用 Microsoft Entra 驗證,瞭解如何在 Azure SQL 資料庫 或 Azure SQL 受控執行個體 中取得資料庫的存取令牌。

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

使用大量插入寫入數據

傳統的 jdbc 連接器會使用逐列插入將數據寫入資料庫。 您可以使用 Spark 連接器,使用大量插入將數據寫入 Azure SQL 和 SQL Server。 當載入大型資料集或將資料載入至使用資料行存放區索引的數據表時,它會大幅改善寫入效能。

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

下一步

如果您尚未下載 Spark 連接器,請從 azure-sqldb-spark GitHub 存放庫 下載 Spark 連接器,並探索存放庫中的其他資源:

您也可以檢閱 Apache Spark SQL、DataFrame 和數據集指南Azure Databricks 檔