Bagikan melalui


Konektor Apache Spark: SQL Server dan Azure SQL

Konektor Apache Spark untuk SQL Server dan Azure SQL adalah konektor berperforma tinggi yang dapat Anda gunakan untuk menyertakan data transaksional dalam analitik big data dan mempertahankan hasil untuk kueri atau pelaporan ad hoc. Dengan menggunakan konektor, Anda dapat menggunakan database SQL, lokal, atau di cloud, sebagai sumber data input atau sink data output untuk pekerjaan Spark.

Nota

Konektor ini tidak dipertahankan secara aktif. Artikel ini hanya dipertahankan untuk tujuan pengarsipan.

Pustaka ini berisi kode sumber untuk Apache Spark Connector untuk platform SQL Server dan Azure SQL.

Apache Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar.

Dua versi konektor tersedia melalui Maven: versi yang kompatibel dengan 2.4.x dan versi yang kompatibel dengan 3.0.x. Unduh konektor dari maven.org dan impor menggunakan koordinat:

Konektor Koordinat Maven
Konektor yang kompatibel dengan Spark 2.4.x com.microsoft.azure:spark-mssql-connector:1.0.2
Konektor Spark 3.0.x yang kompatibel com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Konektor yang kompatibel dengan Spark 3.1.x com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Anda juga dapat membangun konektor dari sumber atau mengunduh JAR dari bagian Rilis di GitHub. Untuk informasi terbaru tentang konektor, lihat Repositori GitHub konektor SQL Spark.

Fitur yang didukung

  • Dukungan untuk semua pengikatan Spark (Scala, Python, R)
  • Autentikasi dasar dan dukungan Tab Kunci Direktori Aktif (AD)
  • Dukungan penulisan yang diatur ulang dataframe
  • Dukungan untuk menulis ke Instans Tunggal SQL Server dan Kumpulan Data di Kluster Big Data SQL Server
  • Dukungan konektor andal untuk Sql Server Single Instance
Komponen Versi yang didukung
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Driver Microsoft JDBC untuk SQL Server 8.4
Microsoft SQL Server SQL Server 2008 atau yang lebih baru
Azure SQL Database Didukung

Opsi yang didukung

Konektor Apache Spark untuk SQL Server dan Azure SQL mendukung opsi yang ditentukan dalam artikel SQL DataSource JDBC .

Selain itu, konektor mendukung opsi berikut:

Opsi Bawaan Deskripsi
reliabilityLevel BEST_EFFORT BEST_EFFORT atau NO_DUPLICATES. NO_DUPLICATES menerapkan penyisipan yang andal dalam skenario pemulihan eksekutor
dataPoolDataSource none none menyiratkan nilai tidak disetel dan konektor harus menulis ke instans tunggal dari SQL Server. Atur nilai ini ke nama sumber data untuk menulis tabel kumpulan data di Kluster Big Data.
isolationLevel READ_COMMITTED Tentukan tingkat isolasi
tableLock false Menerapkan penyisipan opsi TABLOCK untuk meningkatkan performa penulisan
schemaCheckEnabled true Menonaktifkan bingkai data yang ketat dan pemeriksaan skema tabel SQL saat diatur ke false

Atur opsi salin massal lainnya sebagai opsi pada dataframe. Konektor meneruskan opsi ini ke bulkcopy API saat menulis.

Perbandingan performa

Konektor Apache Spark untuk SQL Server dan Azure SQL hingga 15x lebih cepat daripada konektor JDBC generik untuk menulis ke SQL Server. Karakteristik performa bervariasi pada jenis, volume data, opsi yang digunakan, dan mungkin menampilkan variasi di antara setiap eksekusi. Hasil performa berikut adalah waktu yang diperlukan untuk mengganti tabel SQL dengan 143,9 juta baris dalam lingkungan spark dataframe. Spark dataframe dibangun dengan membaca store_sales tabel HDFS yang dihasilkan menggunakan Spark TPCDS Benchmark. Waktu untuk membaca dari store_sales hingga dataframe dikecualikan. Hasilnya dirata-rata dari tiga kali pelaksanaan.

Jenis Konektor Opsi Deskripsi Waktu untuk menulis
JDBCConnector Bawaan Konektor JDBC generik dengan opsi default 1.385 detik
sql-spark-connector BEST_EFFORT Upaya sql-spark-connector terbaik dengan opsi default 580 detik
sql-spark-connector NO_DUPLICATES Dapat diandalkan sql-spark-connector 709 detik
sql-spark-connector BEST_EFFORT + tabLock=true Upaya sql-spark-connector terbaik dengan kunci tabel diaktifkan 72 detik
sql-spark-connector NO_DUPLICATES + tabLock=true Dapat diandalkan sql-spark-connector dengan kunci tabel yang diaktifkan 198 detik

Konfigurasi

  • Konfigurasi Spark: jumlah_pengeksekusi = 20, memori_pengeksekusi = '1664 m', inti_pengeksekusi = 2
  • Konfigurasi Data Gen: faktor skala=50, tabel terpartisi=true
  • File data store_sales dengan jumlah baris 143.997.590

Lingkungan

  • SQL Server Kluster Big Data CU5
  • master + 6 simpul
  • Setiap simpul server Gen-5, dengan RAM 512 GB, NVM 4 TB per simpul, dan NIC 10 Gbps

Masalah yang umum dihadapi

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Kesalahan ini terjadi saat Anda menggunakan versi driver mssql yang lebih lama di lingkungan Hadoop Anda. Konektor sekarang menyertakan driver ini. Jika sebelumnya Anda menggunakan Konektor Azure SQL dan driver yang diinstal secara manual pada kluster Anda untuk kompatibilitas autentikasi Microsoft Entra, hapus driver tersebut.

Untuk memperbaiki kesalahan:

  1. Jika Anda menggunakan lingkungan Hadoop generik, periksa dan hapus mssql JAR dengan perintah berikut: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Jika Anda menggunakan Databricks, tambahkan skrip init global atau kluster untuk menghapus versi mssql driver lama dari /databricks/jars folder, atau tambahkan baris ini ke skrip yang ada: rm /databricks/jars/*mssql*

  2. Tambahkan paket adal4j dan paket mssql. Misalnya, Anda dapat menggunakan Maven tetapi cara apa pun harus berfungsi.

    Perhatian

    Jangan instal konektor SQL spark dengan cara ini.

  3. Tambahkan kelas driver ke konfigurasi koneksi Anda. Contohnya:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Untuk informasi selengkapnya, lihat resolusi untuk https://github.com/microsoft/sql-spark-connector/issues/26.

Mulai

Konektor Apache Spark untuk SQL Server dan Azure SQL didasarkan pada Spark DataSourceV1 API dan SQL Server Bulk API. Ini menggunakan antarmuka yang sama dengan konektor Spark-SQL JDBC bawaan. Dengan menggunakan integrasi ini, Anda dapat dengan mudah mengintegrasikan konektor dan memigrasikan pekerjaan Spark yang ada dengan memperbarui parameter format dengan com.microsoft.sqlserver.jdbc.spark.

Untuk menyertakan konektor dalam proyek Anda, unduh repositori ini dan buat JAR menggunakan SBT.

Menulis ke tabel SQL baru

Peringatan

Mode overwrite terlebih dahulu menghapus tabel jika sudah ada di database. Gunakan opsi ini dengan hati-hati untuk menghindari kehilangan data yang tidak terduga.

Jika Anda menggunakan mode overwrite tanpa opsi truncate saat membuat ulang tabel, operasi akan menghapus indeks. Selain itu, tabel penyimpan kolom berubah menjadi tabel tumpuk. Untuk mempertahankan indeks yang ada, atur opsi ke truncatetrue. Contohnya, .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Tambahkan ke tabel SQL

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Tentukan tingkat isolasi

Konektor ini menggunakan READ_COMMITTED tingkat isolasi secara default saat menyisipkan data secara massal ke dalam database. Untuk mengubah tingkat isolasi, gunakan opsi mssqlIsolationLevel.

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Membaca dari tabel SQL

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra otentikasi

Contoh Python dengan prinsipal layanan

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Contoh Python dengan kata sandi Direktori Aktif

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Untuk mengautentikasi dengan menggunakan Direktori Aktif, instal dependensi yang diperlukan.

Saat Anda menggunakan ActiveDirectoryPassword, user nilainya harus dalam format UPN, seperti username@domainname.com.

Untuk Scala, pasang com.microsoft.aad.adal4j artefak.

Untuk Python, instal adal perpustakaan. Pustaka ini tersedia melalui pip.

Misalnya, lihat contoh buku catatan.