Bagikan melalui


Mempercepat analitik big data real time menggunakan konektor Spark

Berlaku untuk: Azure SQL Database Azure SQL Managed Instance

Catatan

Pada 20 Sep 2020, konektor ini tidak dirawat secara aktif. Namun, Apache Spark Connector untuk SQL Server dan Azure SQL sekarang tersedia, dengan dukungan untuk pengikatan Python dan R, antarmuka yang lebih mudah digunakan untuk menyisipkan data secara massal, dan banyak peningkatan lainnya. Kami sangat menyarankan agar Anda mengevaluasi dan menggunakan konektor baru bukan yang satu ini. Informasi tentang konektor lama (halaman ini) hanya disimpan untuk tujuan arsip.

Konektor Spark memungkinkan database di Azure SQL Database, Azure SQL Managed Instance, dan SQL Server untuk bertindak sebagai sumber data input atau sink data output untuk pekerjaan Spark. Ini memungkinkan Anda untuk menggunakan data transaksional real time dalam analitik big data dan bertahan hasil untuk kueri atau pelaporan ad hoc. Dibandingkan dengan konektor JDBC bawaan, konektor ini menyediakan kemampuan untuk menyisipkan data secara massal ke database Anda. Konektor ini dapat mengungguli penyisipan baris demi baris dengan performa 10x hingga 20x lebih cepat. Konektor Spark mendukung autentikasi dengan ID Microsoft Entra (sebelumnya Azure Active Directory) untuk menyambungkan ke Azure SQL Database dan Azure SQL Managed Instance, memungkinkan Anda menyambungkan database Anda dari Azure Databricks menggunakan akun Microsoft Entra Anda. Konektor ini menyediakan antarmuka yang serupa dengan konektor JDBC bawaan. Sangat mudah untuk memigrasikan pekerjaan Spark Anda yang ada untuk menggunakan konektor baru ini.

Catatan

ID Microsoft Entra sebelumnya dikenal sebagai Azure Active Directory (Azure AD).

Mengunduh dan membangun konektor Spark

Repositori GitHub untuk konektor lama yang sebelumnya ditautkan dari halaman ini tidak dipertahankan secara aktif. Sebagai gantinya, sebaiknya Anda mengevaluasi dan menggunakan konektor baru.

Versi resmi yang didukung

Komponen Versi
Apache Spark 2.0.2 atau yang lebih baru
Scala 2.10 atau yang lebih baru
Driver Microsoft JDBC untuk SQL Server 6.2 atau yang lebih baru
Microsoft SQL Server SQL Server 2008 atau yang lebih baru
Database Azure SQL Didukung
Instans Terkelola Azure SQL Didukung

Konektor Spark menggunakan Driver Microsoft JDBC untuk SQL Server untuk memindahkan data antara simpul pekerja Spark dan database:

Aliran datanya adalah sebagai berikut:

  1. Simpul master Spark tersambung ke database di SQL Database atau SQL Server dan memuat data dari tabel tertentu atau menggunakan kueri SQL tertentu.
  2. Simpul master Spark mendistribusikan data ke simpul pekerja untuk transformasi.
  3. Simpul Pekerja tersambung ke database yang tersambung ke SQL Database dan SQL Server dan menulis data ke database. Pengguna bisa memilih untuk menggunakan penyisipan baris demi baris atau menyisipkan secara massal.

Diagram berikut menggambarkan aliran data.

Diagram menunjukkan alur yang dijelaskan, dengan simpul master yang tersambung langsung ke database dan menyambungkan ke tiga node pekerja, yang tersambung ke database.

Membangun konektor Spark

Saat ini, proyek konektor menggunakan maven. Untuk membangun konektor tanpa dependensi, Anda dapat menjalankan:

  • paket bersih mvn
  • Mengunduh versi terbaru JAR dari folder rilis
  • Menyertakan SQL Database Spark JAR

Menyambungkan dan membaca data menggunakan konektor Spark

Anda dapat menyambungkan ke database di SQL Database dan SQL Server dari pekerjaan Spark untuk membaca atau menulis data. Anda juga bisa menjalankan kueri DML atau DDL dalam database di SQL Database dan SQL Server.

Membaca data dari Azure SQL dan SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Membaca data dari Azure SQL dan SQL Server dengan kueri SQL tertentu

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

Membaca data dari Azure SQL dan SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

Menjalankan kueri DML atau DDL di Azure SQL dan SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

Menyambungkan dari Spark menggunakan autentikasi Microsoft Entra

Anda dapat tersambung ke SQL Database dan SQL Managed Instance menggunakan autentikasi Microsoft Entra. Gunakan autentikasi Microsoft Entra untuk mengelola identitas pengguna database secara terpusat dan sebagai alternatif untuk autentikasi SQL.

Menyambungkan menggunakan Mode Autentikasi ActiveDirectoryPassword

Persyaratan penyiapan

Jika Anda menggunakan mode autentikasi ActiveDirectoryPassword, Anda perlu mengunduh microsoft-authentication-library-for-java dan dependensinya, dan menyertakannya di jalur build Java.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Menyambungkan menggunakan token akses

Persyaratan penyiapan

Jika Anda menggunakan mode autentikasi berbasis token akses, Anda perlu mengunduh microsoft-authentication-library-for-java dan dependensinya, dan menyertakannya di jalur build Java.

Lihat Menggunakan autentikasi Microsoft Entra untuk mempelajari cara mendapatkan token akses ke database Anda di Azure SQL Database atau Azure SQL Managed Instance.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Menulis data menggunakan sisipan massal

Konektor jdbc tradisional menulis data ke database Anda menggunakan penyisipan baris demi baris. Anda dapat menggunakan konektor Spark untuk menulis data ke Azure SQL dan SQL Server menggunakan sisipan massal. Ini secara signifikan meningkatkan performa tulis saat memuat himpunan data besar atau memuat data ke dalam tabel tempat indeks penyimpan kolom digunakan.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

Langkah berikutnya

Jika Anda belum melakukannya, unduh konektor Spark dari repositori GitHub azure-sqldb-spark dan jelajahi sumber daya tambahan di repositori:

Anda mungkin juga ingin meninjau Apache Spark SQL, DataFrames, dan Panduan Himpunan Data dan dokumentasi Azure Databricks.