Konektor Apache Spark: SQL Server & Azure SQL
Konektor Apache Spark untuk SQL Server dan Azure SQL adalah konektor berperforma tinggi yang memungkinkan Anda menggunakan data transaksional dalam analitik big data dan mempertahankan hasil untuk kueri atau pelaporan ad hoc. Konektor memungkinkan Anda menggunakan database SQL, lokal, atau di cloud, sebagai sumber data input atau sink data output untuk pekerjaan Spark.
Pustaka ini berisi kode sumber untuk apache Spark Koneksi or untuk SQL Server dan Azure SQL.
Apache Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar.
Ada dua versi konektor yang tersedia melalui Maven, versi yang kompatibel dengan 2.4.x dan versi yang kompatibel dengan 3.0.x. Kedua versi dapat ditemukan di sini dan dapat diimpor menggunakan koordinat di bawah ini:
Konektor | Koordinat Maven |
---|---|
Konektor kompatibel Spark 2.4.x | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Konektor kompatibel Spark 3.0.x | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Konektor kompatibel Spark 3.1.x | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
Anda juga dapat membangun konektor dari sumber atau mengunduh jar dari bagian Rilis di GitHub. Untuk informasi terbaru tentang konektor, lihat Repositori GitHub konektor SQL Spark.
Fitur yang Didukung
- Dukungan untuk semua pengikatan Spark (Scala, Python, R)
- Autentikasi dasar dan dukungan Tab Kunci Direktori Aktif (AD)
- Dukungan tulis yang diurutkan
dataframe
ulang - Dukungan untuk menulis ke instans Tunggal SQL Server dan Kumpulan Data di SQL Server Kluster Big Data
- Dukungan konektor andal untuk Sql Server Single Instance
Komponen | Versi yang Didukung |
---|---|
Apache Spark | 2.4.x, 3.0.x, 3.1.x |
Scala | 2.11, 2.12 |
Driver Microsoft JDBC untuk SQL Server | 8.4 |
Microsoft SQL Server | SQL Server 2008 atau yang lebih baru |
Azure SQL Database | Didukung |
Opsi yang Didukung
Apache Spark Koneksi or untuk SQL Server dan Azure SQL mendukung opsi yang ditentukan di sini: SQL DataSource JDBC
Selain opsi berikut didukung
Opsi | Default | Deskripsi |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT atau NO_DUPLICATES . NO_DUPLICATES mengimplementasikan sisipan yang andal dalam skenario hidupkan ulang pelaksana |
dataPoolDataSource |
none |
none menyiratkan nilai tidak diatur dan konektor harus menulis ke instans tunggal SQL Server. Atur nilai ini ke nama sumber data untuk menulis tabel kumpulan data di Kluster Big Data |
isolationLevel |
READ_COMMITTED |
Tentukan tingkat isolasi |
tableLock |
false |
Menerapkan sisipkan dengan TABLOCK opsi untuk meningkatkan performa tulis |
schemaCheckEnabled |
true |
Menonaktifkan bingkai data ketat dan pemeriksaan skema tabel sql saat diatur ke false |
Opsi penyalinan massal lainnya dapat diatur sebagai opsi pada dan akan diteruskan dataframe
ke bulkcopy
API saat menulis
Perbandingan performa
Apache Spark Koneksi or untuk SQL Server dan Azure SQL hingga 15x lebih cepat daripada konektor JDBC generik untuk menulis ke SQL Server. Karakteristik performa bervariasi pada jenis, volume data, opsi yang digunakan, dan dapat menunjukkan eksekusi untuk menjalankan variasi. Hasil performa berikut adalah waktu yang diperlukan untuk menimpa tabel SQL dengan 143,9M baris dalam spark dataframe
. Spark dataframe
dibangun dengan membaca store_sales
tabel HDFS yang dihasilkan menggunakan Spark TPCDS Benchmark. Waktu untuk membaca store_sales
dataframe
dikecualikan. Hasilnya rata-rata lebih dari tiga eksekusi.
Jenis Koneksi or | Opsi | Deskripsi | Waktu untuk menulis |
---|---|---|---|
JDBCConnector |
Default | Konektor JDBC generik dengan opsi default | 1385 detik |
sql-spark-connector |
BEST_EFFORT |
Upaya sql-spark-connector terbaik dengan opsi default |
580 detik |
sql-spark-connector |
NO_DUPLICATES |
Dapat diandalkan sql-spark-connector |
709 detik |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
Upaya sql-spark-connector terbaik dengan kunci tabel diaktifkan |
72 detik |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
Andalkan sql-spark-connector dengan kunci tabel diaktifkan |
198 detik |
Konfigurasi
- Konfigurasi Spark: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
- Konfigurasi Data Gen: scale_factor=50, partitioned_tables=true
- File data
store_sales
dengan nr baris 143.997.590
Lingkungan
- SQL Server Big Data Cluster CU5
master
+ 6 simpul- Setiap server node gen 5, Ram 512 GB, NVM 4 TB per simpul, NIC 10 GB
Masalah yang Umum Dihadapi
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
Masalah ini muncul dari penggunaan versi driver mssql yang lebih lama (yang sekarang disertakan dalam konektor ini) di lingkungan hadoop Anda. Jika Anda berasal dari menggunakan Koneksi or Azure SQL sebelumnya dan telah menginstal driver secara manual ke kluster tersebut untuk kompatibilitas autentikasi Microsoft Entra, Anda harus menghapus driver tersebut.
Langkah-langkah untuk memperbaiki masalah:
Jika Anda menggunakan lingkungan Hadoop generik, periksa dan hapus mssql jar:
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
. Jika Anda menggunakan Databricks, tambahkan skrip init global atau kluster untuk menghapus versi lama driver mssql dari/databricks/jars
folder, atau tambahkan baris ini ke skrip yang ada:rm /databricks/jars/*mssql*
adal4j
Tambahkan paket danmssql
. Misalnya, Anda dapat menggunakan Maven tetapi cara apa pun harus berfungsi.Perhatian
Jangan instal konektor spark SQL dengan cara ini.
Tambahkan kelas driver ke konfigurasi koneksi Anda. Misalnya:
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
Untuk informasi dan penjelasan selengkapnya, lihat resolusi ke https://github.com/microsoft/sql-spark-connector/issues/26.
Memulai
Apache Spark Koneksi or untuk SQL Server dan Azure SQL didasarkan pada Spark DataSourceV1 API dan SQL Server Bulk API dan menggunakan antarmuka yang sama dengan konektor JDBC Spark-SQL bawaan. Integrasi ini memungkinkan Anda untuk dengan mudah mengintegrasikan konektor dan memigrasikan pekerjaan Spark yang ada hanya dengan memperbarui parameter format dengan com.microsoft.sqlserver.jdbc.spark
.
Untuk menyertakan konektor dalam proyek Anda, unduh repositori ini dan buat jar menggunakan SBT.
Menulis ke Tabel SQL baru
Peringatan
overwrite
Mode pertama-tama menghilangkan tabel jika sudah ada di database secara default. Silakan gunakan opsi ini dengan berhati-hati untuk menghindari kehilangan data yang tidak terduga.
Saat menggunakan mode overwrite
jika Anda tidak menggunakan opsi truncate
pada pembuatan ulang tabel, indeks akan hilang. , tabel penyimpan kolom sekarang akan menjadi timbunan. Jika Anda ingin mempertahankan pengindeksan yang ada, tentukan juga opsi truncate
dengan nilai true. Contohnya, .option("truncate","true")
.
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Tambahkan ke Tabel SQL
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Tentukan tingkat isolasi
Konektor ini secara default menggunakan READ_COMMITTED
tingkat isolasi saat melakukan sisipan massal ke dalam database. Jika Anda ingin mengambil alih tingkat isolasi, gunakan opsi seperti yang ditunjukkan mssqlIsolationLevel
di bawah ini.
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
Baca dari Tabel SQL
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Autentikasi Microsoft Entra
Contoh Python dengan Perwakilan Layanan
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Contoh Python dengan Kata Sandi Direktori Aktif
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Dependensi yang diperlukan harus diinstal untuk mengautentikasi menggunakan Direktori Aktif.
Format user
saat menggunakan ActiveDirectoryPassword harus berupa format UPN, misalnya username@domainname.com
.
Untuk Scala,_com.microsoft.aad.adal4j_
artefak harus diinstal.
Untuk Python,_adal_
pustaka harus diinstal. Ini tersedia melalui pip.
Periksa contoh buku catatan sampel.
Dukungan
Apache Spark Koneksi or untuk Azure SQL dan SQL Server adalah proyek sumber terbuka. Konektor ini tidak dilengkapi dengan dukungan Microsoft apa pun. Untuk masalah dengan atau pertanyaan tentang konektor, buat Masalah di repositori proyek ini. Komunitas konektor aktif dan memantau pengiriman.
Langkah berikutnya
Kunjungi repositori GitHub konektor SQL Spark.
Untuk informasi tentang tingkat isolasi, lihat MENGATUR TINGKAT ISOLASI TRANSAKSI (Transact-SQL).