Bagikan melalui


Konektor Apache Spark: SQL Server & Azure SQL

Konektor Apache Spark untuk SQL Server dan Azure SQL adalah konektor berperforma tinggi yang memungkinkan Anda menggunakan data transaksional dalam analitik big data dan mempertahankan hasil untuk kueri atau pelaporan ad hoc. Konektor memungkinkan Anda menggunakan database SQL, lokal, atau di cloud, sebagai sumber data input atau sink data output untuk pekerjaan Spark.

Pustaka ini berisi kode sumber untuk apache Spark Koneksi or untuk SQL Server dan Azure SQL.

Apache Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar.

Ada dua versi konektor yang tersedia melalui Maven, versi yang kompatibel dengan 2.4.x dan versi yang kompatibel dengan 3.0.x. Kedua versi dapat ditemukan di sini dan dapat diimpor menggunakan koordinat di bawah ini:

Konektor Koordinat Maven
Konektor kompatibel Spark 2.4.x com.microsoft.azure:spark-mssql-connector:1.0.2
Konektor kompatibel Spark 3.0.x com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Konektor kompatibel Spark 3.1.x com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Anda juga dapat membangun konektor dari sumber atau mengunduh jar dari bagian Rilis di GitHub. Untuk informasi terbaru tentang konektor, lihat Repositori GitHub konektor SQL Spark.

Fitur yang Didukung

  • Dukungan untuk semua pengikatan Spark (Scala, Python, R)
  • Autentikasi dasar dan dukungan Tab Kunci Direktori Aktif (AD)
  • Dukungan tulis yang diurutkan dataframe ulang
  • Dukungan untuk menulis ke instans Tunggal SQL Server dan Kumpulan Data di SQL Server Kluster Big Data
  • Dukungan konektor andal untuk Sql Server Single Instance
Komponen Versi yang Didukung
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Driver Microsoft JDBC untuk SQL Server 8.4
Microsoft SQL Server SQL Server 2008 atau yang lebih baru
Azure SQL Database Didukung

Opsi yang Didukung

Apache Spark Koneksi or untuk SQL Server dan Azure SQL mendukung opsi yang ditentukan di sini: SQL DataSource JDBC

Selain opsi berikut didukung

Opsi Default Deskripsi
reliabilityLevel BEST_EFFORT BEST_EFFORT atau NO_DUPLICATES. NO_DUPLICATES mengimplementasikan sisipan yang andal dalam skenario hidupkan ulang pelaksana
dataPoolDataSource none none menyiratkan nilai tidak diatur dan konektor harus menulis ke instans tunggal SQL Server. Atur nilai ini ke nama sumber data untuk menulis tabel kumpulan data di Kluster Big Data
isolationLevel READ_COMMITTED Tentukan tingkat isolasi
tableLock false Menerapkan sisipkan dengan TABLOCK opsi untuk meningkatkan performa tulis
schemaCheckEnabled true Menonaktifkan bingkai data ketat dan pemeriksaan skema tabel sql saat diatur ke false

Opsi penyalinan massal lainnya dapat diatur sebagai opsi pada dan akan diteruskan dataframe ke bulkcopy API saat menulis

Perbandingan performa

Apache Spark Koneksi or untuk SQL Server dan Azure SQL hingga 15x lebih cepat daripada konektor JDBC generik untuk menulis ke SQL Server. Karakteristik performa bervariasi pada jenis, volume data, opsi yang digunakan, dan dapat menunjukkan eksekusi untuk menjalankan variasi. Hasil performa berikut adalah waktu yang diperlukan untuk menimpa tabel SQL dengan 143,9M baris dalam spark dataframe. Spark dataframe dibangun dengan membaca store_sales tabel HDFS yang dihasilkan menggunakan Spark TPCDS Benchmark. Waktu untuk membaca store_salesdataframe dikecualikan. Hasilnya rata-rata lebih dari tiga eksekusi.

Jenis Koneksi or Opsi Deskripsi Waktu untuk menulis
JDBCConnector Default Konektor JDBC generik dengan opsi default 1385 detik
sql-spark-connector BEST_EFFORT Upaya sql-spark-connector terbaik dengan opsi default 580 detik
sql-spark-connector NO_DUPLICATES Dapat diandalkan sql-spark-connector 709 detik
sql-spark-connector BEST_EFFORT + tabLock=true Upaya sql-spark-connector terbaik dengan kunci tabel diaktifkan 72 detik
sql-spark-connector NO_DUPLICATES + tabLock=true Andalkan sql-spark-connector dengan kunci tabel diaktifkan 198 detik

Konfigurasi

  • Konfigurasi Spark: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
  • Konfigurasi Data Gen: scale_factor=50, partitioned_tables=true
  • File data store_sales dengan nr baris 143.997.590

Lingkungan

Masalah yang Umum Dihadapi

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Masalah ini muncul dari penggunaan versi driver mssql yang lebih lama (yang sekarang disertakan dalam konektor ini) di lingkungan hadoop Anda. Jika Anda berasal dari menggunakan Koneksi or Azure SQL sebelumnya dan telah menginstal driver secara manual ke kluster tersebut untuk kompatibilitas autentikasi Microsoft Entra, Anda harus menghapus driver tersebut.

Langkah-langkah untuk memperbaiki masalah:

  1. Jika Anda menggunakan lingkungan Hadoop generik, periksa dan hapus mssql jar: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Jika Anda menggunakan Databricks, tambahkan skrip init global atau kluster untuk menghapus versi lama driver mssql dari /databricks/jars folder, atau tambahkan baris ini ke skrip yang ada: rm /databricks/jars/*mssql*

  2. adal4j Tambahkan paket dan mssql . Misalnya, Anda dapat menggunakan Maven tetapi cara apa pun harus berfungsi.

    Perhatian

    Jangan instal konektor spark SQL dengan cara ini.

  3. Tambahkan kelas driver ke konfigurasi koneksi Anda. Misalnya:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Untuk informasi dan penjelasan selengkapnya, lihat resolusi ke https://github.com/microsoft/sql-spark-connector/issues/26.

Memulai

Apache Spark Koneksi or untuk SQL Server dan Azure SQL didasarkan pada Spark DataSourceV1 API dan SQL Server Bulk API dan menggunakan antarmuka yang sama dengan konektor JDBC Spark-SQL bawaan. Integrasi ini memungkinkan Anda untuk dengan mudah mengintegrasikan konektor dan memigrasikan pekerjaan Spark yang ada hanya dengan memperbarui parameter format dengan com.microsoft.sqlserver.jdbc.spark.

Untuk menyertakan konektor dalam proyek Anda, unduh repositori ini dan buat jar menggunakan SBT.

Menulis ke Tabel SQL baru

Peringatan

overwrite Mode pertama-tama menghilangkan tabel jika sudah ada di database secara default. Silakan gunakan opsi ini dengan berhati-hati untuk menghindari kehilangan data yang tidak terduga.

Saat menggunakan mode overwrite jika Anda tidak menggunakan opsi truncate pada pembuatan ulang tabel, indeks akan hilang. , tabel penyimpan kolom sekarang akan menjadi timbunan. Jika Anda ingin mempertahankan pengindeksan yang ada, tentukan juga opsi truncate dengan nilai true. Contohnya, .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Tambahkan ke Tabel SQL

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Tentukan tingkat isolasi

Konektor ini secara default menggunakan READ_COMMITTED tingkat isolasi saat melakukan sisipan massal ke dalam database. Jika Anda ingin mengambil alih tingkat isolasi, gunakan opsi seperti yang ditunjukkan mssqlIsolationLevel di bawah ini.

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Baca dari Tabel SQL

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Autentikasi Microsoft Entra

Contoh Python dengan Perwakilan Layanan

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Contoh Python dengan Kata Sandi Direktori Aktif

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Dependensi yang diperlukan harus diinstal untuk mengautentikasi menggunakan Direktori Aktif.

Format user saat menggunakan ActiveDirectoryPassword harus berupa format UPN, misalnya username@domainname.com.

Untuk Scala,_com.microsoft.aad.adal4j_ artefak harus diinstal.

Untuk Python,_adal_ pustaka harus diinstal. Ini tersedia melalui pip.

Periksa contoh buku catatan sampel.

Dukungan

Apache Spark Koneksi or untuk Azure SQL dan SQL Server adalah proyek sumber terbuka. Konektor ini tidak dilengkapi dengan dukungan Microsoft apa pun. Untuk masalah dengan atau pertanyaan tentang konektor, buat Masalah di repositori proyek ini. Komunitas konektor aktif dan memantau pengiriman.

Langkah berikutnya

Kunjungi repositori GitHub konektor SQL Spark.

Untuk informasi tentang tingkat isolasi, lihat MENGATUR TINGKAT ISOLASI TRANSAKSI (Transact-SQL).