Delen via


Realtime big data-analyses versnellen met behulp van de Spark-connector

Van toepassing op: Azure SQL DatabaseAzure SQL Managed Instance

Notitie

Vanaf september 2020 wordt deze connector niet actief onderhouden. Apache Spark Verbinding maken or voor SQL Server en Azure SQL is nu echter beschikbaar, met ondersteuning voor Python- en R-bindingen, een eenvoudiger te gebruiken interface voor het bulksgewijs invoegen van gegevens en vele andere verbeteringen. We raden u ten zeerste aan om de nieuwe connector te evalueren en te gebruiken in plaats van deze. De informatie over de oude connector (deze pagina) wordt alleen bewaard voor archiveringsdoeleinden.

Met de Spark-connector kunnen databases in Azure SQL Database, Azure SQL Managed Instance en SQL Server fungeren als de invoergegevensbron of uitvoergegevenssink voor Spark-taken. Hiermee kunt u realtime transactionele gegevens gebruiken in big data-analyses en resultaten behouden voor ad-hocquery's of rapportages. Vergeleken met de ingebouwde JDBC-connector biedt deze connector de mogelijkheid om gegevens bulksgewijs in uw database in te voegen. Het kan beter presteren dan rij-op-rij-invoeging met 10x tot 20x snellere prestaties. De Spark-connector ondersteunt verificatie met Microsoft Entra ID (voorheen Azure Active Directory) om verbinding te maken met Azure SQL Database en Azure SQL Managed Instance, zodat u verbinding kunt maken met uw database vanuit Azure Databricks met behulp van uw Microsoft Entra-account. Het biedt vergelijkbare interfaces met de ingebouwde JDBC-connector. Het is eenvoudig om uw bestaande Spark-taken te migreren om deze nieuwe connector te gebruiken.

Notitie

Microsoft Entra-id is de nieuwe naam voor Azure Active Directory (Azure AD). Op dit moment wordt de documentatie bijgewerkt.

Een Spark-connector downloaden en bouwen

De GitHub-opslagplaats voor de oude connector waarnaar eerder vanaf deze pagina is gekoppeld, wordt niet actief onderhouden. In plaats daarvan raden we u sterk aan om de nieuwe connector te evalueren en te gebruiken.

Officiƫle ondersteunde versies

Onderdeel Versie
Apache Spark 2.0.2 of hoger
Scala 2.10 of hoger
Microsoft JDBC-stuurprogramma voor SQL Server 6.2 of hoger
Microsoft SQL Server SQL Server 2008 of hoger
Azure SQL-database Ondersteund
Azure SQL Managed Instance Ondersteund

De Spark-connector maakt gebruik van het Microsoft JDBC-stuurprogramma voor SQL Server om gegevens te verplaatsen tussen Spark-werkknooppunten en -databases:

De gegevensstroom is als volgt:

  1. Het Spark-hoofdknooppunt maakt verbinding met databases in SQL Database of SQL Server en laadt gegevens uit een specifieke tabel of met behulp van een specifieke SQL-query.
  2. Het Spark-hoofdknooppunt distribueert gegevens naar werkknooppunten voor transformatie.
  3. Het werkknooppunt maakt verbinding met databases die verbinding maken met SQL Database en SQL Server en schrijft gegevens naar de database. De gebruiker kan ervoor kiezen om rij-op-rij in te voegen of bulksgewijs in te voegen.

In het volgende diagram ziet u de gegevensstroom.

Diagram shows the described flow, with a master node connecting directly to the database and connecting to three worker nodes, which connect to the database.

De Spark-connector bouwen

Op dit moment maakt het connectorproject gebruik van Maven. Als u de connector wilt bouwen zonder afhankelijkheden, kunt u het volgende uitvoeren:

  • mvn clean package
  • De nieuwste versies van het JAR downloaden uit de releasemap
  • De SQL Database Spark JAR opnemen

gegevens Verbinding maken en lezen met behulp van de Spark-connector

U kunt vanuit een Spark-taak verbinding maken met databases in SQL Database en SQL Server om gegevens te lezen of te schrijven. U kunt ook een DML- of DDL-query uitvoeren in databases in SQL Database en SQL Server.

Gegevens lezen uit Azure SQL en SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Gegevens lezen uit Azure SQL en SQL Server met een opgegeven SQL-query

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

Gegevens schrijven naar Azure SQL en SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

DML- of DDL-query uitvoeren in Azure SQL en SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

Verbinding maken van Spark met behulp van Microsoft Entra-verificatie

U kunt verbinding maken met SQL Database en SQL Managed Instance met behulp van Microsoft Entra-verificatie. Gebruik Microsoft Entra-verificatie om identiteiten van databasegebruikers centraal te beheren en als alternatief voor SQL-verificatie.

Verbinding maken met de verificatiemodus ActiveDirectoryPassword

Installatievereiste

Als u de ActiveDirectoryPassword-verificatiemodus gebruikt, moet u microsoft-authentication-library-for-java en de bijbehorende afhankelijkheden downloaden en opnemen in het Java-buildpad.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Verbinding maken met behulp van een toegangstoken

Installatievereiste

Als u de verificatiemodus op basis van een toegangstoken gebruikt, moet u microsoft-authentication-library-for-java en de bijbehorende afhankelijkheden downloaden en opnemen in het Java-buildpad.

Zie Microsoft Entra-verificatie gebruiken voor meer informatie over het ophalen van een toegangstoken voor uw database in Azure SQL Database of Azure SQL Managed Instance.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Gegevens schrijven met behulp van bulksgewijs invoegen

De traditionele jdbc-connector schrijft gegevens naar uw database met behulp van rij-per-rij-invoeging. U kunt de Spark-connector gebruiken om gegevens naar Azure SQL en SQL Server te schrijven met behulp van bulksgewijs invoegen. Dit verbetert de schrijfprestaties aanzienlijk bij het laden van grote gegevenssets of het laden van gegevens in tabellen waarin een kolomarchiefindex wordt gebruikt.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

Volgende stappen

Als u dat nog niet hebt gedaan, downloadt u de Spark-connector vanuit de GitHub-opslagplaats azure-sqldb-spark en verkent u de aanvullende resources in de opslagplaats:

U kunt ook de Handleiding voor Apache Spark SQL, DataFrames en Gegevenssets en de Documentatie voor Azure Databricks bekijken.