Valós idejű big data-elemzés felgyorsítása a Spark-összekötővel

A következőre vonatkozik: Azure SQL DatabaseFelügyelt Azure SQL-példány

Megjegyzés:

2020 szeptemberétől ezt az összekötőt nem tartjuk aktívan karban. Az SQL Serverhez és az Azure SQL-hez készült Apache Spark Csatlakozás or azonban már elérhető, a Python- és R-kötések támogatásával, az adatok tömeges beszúrását megkönnyítő felülettel és számos egyéb fejlesztéssel. Határozottan javasoljuk, hogy ne ezt, hanem az új összekötőt értékelje ki és használja. A régi összekötőre (ezen az oldalon) vonatkozó információk csak archiválási célból maradnak meg.

A Spark-összekötő lehetővé teszi, hogy az Azure SQL Database, az Azure SQL Managed Instance és az SQL Server adatbázisai bemeneti adatforrásként vagy kimeneti adatgyűjtőként működjenek a Spark-feladatokhoz. Lehetővé teszi, hogy valós idejű tranzakciós adatokat használjon fel a big data-elemzésekben, és megőrizze az eredményeket alkalmi lekérdezésekhez vagy jelentésekhez. A beépített JDBC-összekötőhöz képest ez az összekötő lehetővé teszi az adatok tömeges beszúrását az adatbázisba. 10-20-szor gyorsabb teljesítménnyel képes felülmúlni a sorról sorra történő beszúrást. A Spark-összekötő támogatja a Microsoft Entra-azonosítóval (korábbi nevén Azure Active Directory) való hitelesítést az Azure SQL Database-hez és a felügyelt Azure SQL-példányhoz való csatlakozáshoz, lehetővé téve az adatbázis azure Databricksből való csatlakoztatását a Microsoft Entra-fiókjával. Hasonló interfészeket biztosít a beépített JDBC-összekötőhöz. Az új összekötő használatához egyszerűen migrálhatja a meglévő Spark-feladatokat.

Megjegyzés:

A Microsoft Entra ID az Azure Active Directory (Azure AD) új neve. Jelenleg frissítjük a dokumentációt.

Spark-összekötő letöltése és létrehozása

A lapról korábban csatolt régi összekötő GitHub-adattára nincs aktívan karbantartva. Ehelyett határozottan javasoljuk, hogy értékelje ki és használja az új összekötőt.

Hivatalos támogatott verziók

Összetevő Verzió
Apache Spark 2.0.2 vagy újabb
Scala 2.10 vagy újabb
Microsoft JDBC-illesztőprogram SQL Serverhez 6.2 vagy újabb
Microsoft SQL Server SQL Server 2008 vagy újabb
Azure SQL Database Supported
Azure SQL Managed Instance Supported

A Spark-összekötő az SQL ServerHez készült Microsoft JDBC-illesztőprogramot használja az adatok Spark-feldolgozó csomópontok és adatbázisok közötti áthelyezéséhez:

Az adatfolyam a következő:

  1. A Spark fő csomópontja az SQL Database vagy az SQL Server adatbázisaihoz csatlakozik, és adatokat tölt be egy adott táblából vagy egy adott SQL-lekérdezés használatával.
  2. A Spark fő csomópontja osztja el az adatokat a feldolgozó csomópontok között átalakítás céljából.
  3. A Feldolgozó csomópont az SQL Database-hez és az SQL Serverhez csatlakozó adatbázisokhoz csatlakozik, és adatokat ír az adatbázisba. A felhasználó választhatja a sorról sorra történő beszúrást vagy a tömeges beszúrást.

Az alábbi ábra az adatfolyamot szemlélteti.

Diagram shows the described flow, with a master node connecting directly to the database and connecting to three worker nodes, which connect to the database.

A Spark-összekötő létrehozása

Az összekötő projekt jelenleg maven-t használ. Ha függőségek nélkül szeretné létrehozni az összekötőt, futtassa a következőt:

  • mvn tiszta csomag
  • Töltse le a JAR legújabb verzióit a kiadási mappából
  • Az SQL Database Spark JAR-jának belefoglalása

adatok Csatlakozás és olvasása a Spark-összekötővel

Az SQL Database-ben és az SQL Serverben lévő adatbázisokhoz Spark-feladatból csatlakozhat az adatok olvasásához vagy írásához. DML- vagy DDL-lekérdezést is futtathat az SQL Database és az SQL Server adatbázisaiban.

Adatok olvasása az Azure SQL-ből és az SQL Serverről

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Adatok olvasása az Azure SQL-ből és az SQL Serverről megadott SQL-lekérdezéssel

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

Adatok írása az Azure SQL-be és az SQL Serverbe

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

DML- vagy DDL-lekérdezés futtatása az Azure SQL-ben és az SQL Serveren

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

Csatlakozás a Sparkból Microsoft Entra-hitelesítéssel

Microsoft Entra-hitelesítéssel csatlakozhat az SQL Database-hez és a felügyelt SQL-példányhoz. A Microsoft Entra-hitelesítéssel központilag kezelheti az adatbázis-felhasználók identitásait, és alternatívaként használhatja az SQL-hitelesítést.

ActiveDirectoryPassword hitelesítési mód Csatlakozás

Beállítási követelmény

Ha ActiveDirectoryPassword hitelesítési módot használ, le kell töltenie a microsoft-authentication-library-for-java fájlt és annak függőségeit, és fel kell vennie őket a Java buildelési útvonalára.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Csatlakozás hozzáférési jogkivonat használatával

Beállítási követelmény

Ha hozzáférési jogkivonat-alapú hitelesítési módot használ, le kell töltenie a Microsoft-authentication-library-for-Java-t és annak függőségeit, és fel kell vennie őket a Java buildelési útvonalára.

A Microsoft Entra-hitelesítés használatával megtudhatja, hogyan szerezhet be hozzáférési jogkivonatot az adatbázishoz az Azure SQL Database-ben vagy a felügyelt Azure SQL-példányban.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Adatok írása tömeges beszúrással

A hagyományos jdbc-összekötő sorról sorra történő beszúrással adatokat ír az adatbázisba. A Spark-összekötővel tömeges beszúrással adatokat írhat az Azure SQL-be és az SQL Serverbe. Jelentősen javítja az írási teljesítményt nagy adathalmazok betöltésekor, vagy az adatok olyan táblákba való betöltésekor, ahol oszloptároló-indexet használnak.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

További lépések

Ha még nem tette meg, töltse le a Spark-összekötőt az Azure-sqldb-spark GitHub-adattárból, és fedezze fel az adattár további erőforrásait:

Érdemes lehet áttekinteni az Apache Spark SQL, a DataFrames és az Adatkészletek útmutatóját , valamint az Azure Databricks dokumentációját is.