Bagikan melalui


Konektor Kumpulan SQL Khusus Azure Synapse untuk Apache Spark

Pendahuluan

Konektor Kumpulan SQL Khusus Azure Synapse untuk Apache Spark di Azure Synapse Analytics memungkinkan perpindahan kumpulan data besar yang efisien antara runtime Apache Spark dan Kumpulan SQL khusus. Konektor dikirim sebagai pustaka default dengan Ruang Kerja Azure Synapse. Konektor diimplementasikan menggunakan bahasa Scala. Konektor mendukung Scala dan Python. Untuk menggunakan Konektor dengan pilihan bahasa notebook lainnya, gunakan perintah ajaib Spark - %%spark.

Pada tingkat tinggi, konektor menyediakan kemampuan berikut:

  • Baca dari Kumpulan SQL Khusus Azure Synapse:
    • Membaca himpunan data besar dari Tabel Kumpulan SQL Khusus Synapse (Internal dan Eksternal) dan tampilan.
    • Dukungan push down predikat komprehensif, di mana filter pada DataFrame dipetakan ke push down predikat SQL yang sesuai.
    • Dukungan untuk pemangkasan kolom.
    • Dukungan untuk pendorongan kueri ke bawah.
  • Tulis ke Kumpulan SQL Khusus Azure Synapse:
    • Menyerap data volume besar ke jenis tabel Internal dan Eksternal.
    • Mendukung preferensi mode penyimpanan DataFrame berikut:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • Jenis Tulis ke Tabel Eksternal mendukung format file Parquet dan Teks Berbatas (contoh - CSV).
    • Untuk menulis data ke tabel internal, konektor sekarang menggunakan pernyataan COPY alih-alih pendekatan CETAS/CTAS.
    • Penyempurnaan untuk mengoptimalkan performa throughput tulis menyeluruh.
    • Memperkenalkan handel panggilan balik opsional (argumen fungsi Scala) yang dapat digunakan klien untuk menerima metrik pasca-penulisan.
      • Beberapa contoh termasuk - jumlah rekaman, durasi untuk menyelesaikan tindakan tertentu, dan alasan kegagalan.

Pendekatan orkestrasi

Baca

A high-level data flow diagram to describe the connector's orchestration of a read request.

Tulis

A high-level data flow diagram to describe the connector's orchestration of a write request.

Prasyarat

Prasyarat seperti menyiapkan sumber daya Azure yang diperlukan dan langkah-langkah untuk mengonfigurasinya dibahas di bagian ini.

Sumber daya Azure

Tinjau dan siapkan Sumber Daya Azure terikat berikut:

Siapkan database

Sambungkan ke database Kumpulan SQL Khusus Synapse dan jalankan pernyataan pengaturan berikut:

  • Buat pengguna database yang dipetakan ke Identitas pengguna Microsoft Entra yang digunakan untuk masuk ke Ruang Kerja Azure Synapse.

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • Buat skema di mana tabel akan ditentukan, sehingga Konektor dapat berhasil menulis-ke dan membaca-dari tabel masing-masing.

    CREATE SCHEMA [<schema_name>];
    

Autentikasi

Autentikasi berbasis ID Microsoft Entra

Autentikasi berbasis ID Microsoft Entra adalah pendekatan autentikasi terintegrasi. Pengguna diharuskan untuk berhasil masuk ke Ruang Kerja Azure Synapse Analytics.

Autentikasi dasar

Pendekatan autentikasi dasar mengharuskan pengguna mengonfigurasi opsi username dan password. Lihat bagian - Opsi konfigurasi untuk mempelajari parameter konfigurasi yang relevan untuk membaca dari dan menulis ke tabel di Kumpulan SQL Khusus Azure Synapse.

Authorization

Azure Data Lake Storage Gen2

Ada dua cara untuk memberikan izin akses ke Azure Data Lake Storage Gen2 - Akun Storage:

  • Peran Access Control berbasis peran - peran Kontributor Data Blob Storage
    • Menetapkan Storage Blob Data Contributor Role pemberian izin Pengguna untuk membaca, menulis, dan menghapus dari Kontainer Blob Azure Storage.
    • RBAC menawarkan pendekatan kontrol kasar di tingkat kontainer.
  • Daftar Kontrol Akses (ACL)
    • Pendekatan ACL memungkinkan kontrol mendetail atas jalur dan/atau file tertentu di bawah folder tertentu.
    • Pemeriksaan ACL tidak diberlakukan jika Pengguna sudah diberikan izin menggunakan pendekatan RBAC.
    • Ada dua jenis izin ACL yang luas:
      • Izin Akses (diterapkan pada tingkat atau objek tertentu).
      • Izin Default (secara otomatis diterapkan untuk semua objek turunan pada saat pembuatannya).
    • Jenis izin meliputi:
      • Execute memungkinkan kemampuan untuk melintasi atau menavigasi hierarki folder.
      • Read memungkinkan kemampuan untuk membaca.
      • Write memungkinkan kemampuan untuk menulis.
    • Mengonfigurasi ACL merupakan hal yang penting sehingga Konektor dapat berhasil menulis dan membaca dari lokasi penyimpanan.

Catatan

  • Jika Anda ingin menjalankan buku catatan menggunakan alur Ruang Kerja Synapse, Anda juga harus memberikan izin akses yang tercantum di atas ke identitas terkelola default Ruang Kerja Synapse. Nama identitas terkelola default ruang kerja sama dengan nama ruang kerja.

  • Untuk menggunakan ruang kerja Synapse dengan akun penyimpanan aman, titik akhir privat terkelola harus dikonfigurasi dari buku catatan. Titik akhir privat terkelola harus disetujui dari bagian Private endpoint connections akun penyimpanan ADLS Gen2 di panel Networking.

Azure Synapse: Kumpulan SQL Khusus

Untuk memungkinkan keberhasilan interaksi dengan Kumpulan SQL Khusus Azure Synapse, otorisasi berikut diperlukan kecuali Anda adalah pengguna yang juga dikonfigurasi sebagai Active Directory Admin pada Titik Akhir SQL Khusus:

  • Skenario baca

    • Berikan db_exporter pada pengguna menggunakan prosedur tersimpan sistem sp_addrolemember.

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • Skenario tulis

    • Konektor menggunakan perintah COPY untuk menulis data dari penahapan ke lokasi terkelola tabel internal.
      • Konfigurasikan izin yang diperlukan yang dijelaskan di sini.

      • Berikut ini adalah cuplikan akses cepat yang sama:

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

Dokumentasi API

Konektor Kumpulan SQL Khusus Azure Synapse untuk Apache Spark - Dokumentasi API.

Opsi konfigurasi

Agar berhasil melakukan bootstrap dan mengorkestrasi operasi baca atau tulis, Konektor mengharapkan parameter konfigurasi tertentu. Definisi objek - com.microsoft.spark.sqlanalytics.utils.Constants menyediakan daftar konstanta standar untuk setiap kunci parameter.

Berikut daftar opsi konfigurasi berdasarkan skenario penggunaan:

  • Baca menggunakan autentikasi berbasis ID Microsoft Entra
    • Info masuk dipetakan secara otomatis, dan pengguna tidak diharuskan untuk memberikan opsi konfigurasi tertentu.
    • Argumen nama tabel tiga bagian pada metode synapsesql diperlukan untuk membaca dari tabel masing-masing di Kumpulan SQL Khusus Azure Synapse.
  • Membaca menggunakan autentikasi dasar
    • Titik Akhir SQL Khusus Azure Synapse
      • Constants.SERVER- Titik Akhir Kumpulan SQL Khusus Synapse (FQDN Server)
      • Constants.USER - Nama Pengguna SQL.
      • Constants.PASSWORD - Kata Sandi Pengguna SQL.
    • Titik Akhir Azure Data Lake Storage (Gen 2) - Folder Penahapan
      • Constants.DATA_SOURCE - Jalur penyimpanan yang diatur pada parameter lokasi sumber data digunakan untuk penahapan data.
  • Menulis menggunakan autentikasi berbasis ID Microsoft Entra
    • Titik Akhir SQL Khusus Azure Synapse
      • Secara default, Konektor menyimpulkan titik akhir SQL Khusus Synapse menggunakan nama database yang ditetapkan pada synapsesql parameter nama tabel tiga bagian metode tersebut.
      • Sebagai alternatif, pengguna dapat menggunakan opsi Constants.SERVER untuk menentukan titik akhir sql. Pastikan titik akhir menghosting database yang sesuai dengan skema masing-masing.
    • Titik Akhir Azure Data Lake Storage (Gen 2) - Folder Penahapan
      • Untuk Jenis Tabel Internal:
        • Konfigurasikan opsi Constants.TEMP_FOLDER atau Constants.DATA_SOURCE.
        • Jika pengguna memilih untuk menyediakan opsi Constants.DATA_SOURCE, folder penahapan akan diperoleh menggunakan nilai location dari DataSource.
        • Jika keduanya disediakan, maka nilai opsi Constants.TEMP_FOLDER akan digunakan.
        • Dengan tidak adanya opsi folder penahapan, Konektor akan memperolehnya berdasarkan konfigurasi runtime - spark.sqlanalyticsconnector.stagingdir.prefix.
      • Untuk Tipe Tabel Eksternal:
        • Constants.DATA_SOURCE adalah opsi konfigurasi yang diperlukan.
        • Konektor menggunakan jalur penyimpanan yang diatur pada parameter lokasi sumber data dalam kombinasi dengan argumen location ke metode synapsesql dan memperoleh jalur absolut untuk mempertahankan data tabel eksternal.
        • Jika argumen location ke metode synapsesql tidak disediakan, maka konektor akan memperoleh nilai lokasi sebagai <base_path>/dbName/schemaName/tableName.
  • Menulis menggunakan autentikasi dasar
    • Titik Akhir SQL Khusus Azure Synapse
      • Constants.SERVER - - Titik Akhir Kumpulan SQL Khusus Synapse (FQDN Server).
      • Constants.USER - Nama Pengguna SQL.
      • Constants.PASSWORD - Kata Sandi Pengguna SQL.
      • Constants.STAGING_STORAGE_ACCOUNT_KEY terkait dengan Akun Penyimpanan yang menghosting Constants.TEMP_FOLDERS (hanya jenis tabel internal) atau Constants.DATA_SOURCE.
    • Titik Akhir Azure Data Lake Storage (Gen 2) - Folder Penahapan
      • Info masuk autentikasi dasar SQL tidak berlaku untuk mengakses titik akhir penyimpanan.
      • Oleh karena itu, pastikan untuk menetapkan izin akses penyimpanan yang relevan seperti yang dijelaskan di bagian Azure Data Lake Storage Gen2.

Templat kode

Bagian ini menyajikan templat kode referensi untuk menjelaskan cara menggunakan dan memanggil Konektor Kumpulan SQL Khusus Azure Synapse untuk Apache Spark.

Catatan

Menggunakan Konektor pada Python-

  • Konektor hanya didukung di dalam Python untuk Spark 3. Untuk Spark 2.4 (tidak didukung), kita dapat menggunakan API konektor Scala untuk berinteraksi dengan konten dari DataFrame di PySpark dengan menggunakan DataFrame.createOrReplaceTempView atau DataFrame.createOrReplaceGlobalTempView. Lihat Bagian - Menggunakan data materialisasi di seluruh sel.
  • Handel panggilan balik tidak tersedia di Python.

Membaca dari Kumpulan SQL Khusus Azure Synapse

Membaca Permintaan - Tanda tangan metode synapsesql

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

Membaca dari tabel menggunakan autentikasi berbasis ID Microsoft Entra

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

Membaca dari kueri menggunakan autentikasi berbasis ID Microsoft Entra

Catatan

Pembatasan saat membaca dari kueri:

  • Nama tabel dan kueri tidak dapat ditentukan secara bersamaan.
  • Hanya kueri yang dipilih yang diizinkan. SQL DDL dan DML tidak diizinkan.
  • Opsi pilih dan filter pada dataframe tidak didorong ke kumpulan khusus SQL saat kueri ditentukan.
  • Baca dari kueri hanya tersedia di Spark 3.1 dan 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Membaca dari tabel menggunakan autentikasi dasar

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

Membaca dari kueri menggunakan autentikasi dasar

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Menulis ke Kumpulan SQL Khusus Azure Synapse

Menulis Permintaan - Tanda tangan metode synapsesql

Tanda tangan metode untuk versi Koneksi or yang dibuat untuk Spark 2.4.8 memiliki satu argumen yang lebih sedikit, daripada yang diterapkan ke versi Spark 3.1.2. Berikut adalah dua tanda tangan metode:

  • Spark Pool Versi 2.4.8
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None):Unit
  • Spark Pool Versi 3.1.2
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

Menulis menggunakan autentikasi berbasis ID Microsoft Entra

Berikut merupakan templat kode komprehensif yang menjelaskan cara menggunakan Konektor untuk skenario tulis:

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

Menulis menggunakan autentikasi dasar

Cuplikan kode berikut menggantikan definisi tulis yang dijelaskan di bagian Menulis menggunakan autentikasi berbasis ID Microsoft Entra, untuk mengirimkan permintaan tulis menggunakan pendekatan autentikasi dasar SQL:

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

Dalam pendekatan autentikasi dasar, untuk membaca data dari jalur penyimpanan sumber, opsi konfigurasi lainnya diperlukan. Cuplikan kode berikut memberikan contoh untuk dibaca dari sumber data Azure Data Lake Storage Gen2 menggunakan info masuk Perwakilan Layanan:

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

Mode penyimpanan DataFrame yang didukung

Mode penyimpanan berikut didukung saat menulis data sumber ke tabel tujuan di Kumpulan SQL Khusus Azure Synapse:

  • ErrorIfExists (mode penyimpanan default)
    • Jika tabel tujuan sudah ada, operasi tulis dibatalkan dengan pengecualian yang ditampilkan ke penerima panggilan. Jika tidak, tabel baru dibuat dengan data dari folder penahapan.
  • Mengabaikan
    • Jika tabel tujuan sudah ada, operasi tulis akan mengabaikan permintaan tulis tanpa menampilkan kesalahan. Jika tidak, tabel baru dibuat dengan data dari folder penahapan.
  • Menimpa
    • Jika tabel tujuan sudah ada, data yang ada di tujuan diganti dengan data dari folder penahapan. Jika tidak, tabel baru dibuat dengan data dari folder penahapan.
  • Menambahkan
    • Jika tabel tujuan sudah ada, data baru ditambahkan ke dalamnya. Jika tidak, tabel baru dibuat dengan data dari folder penahapan.

Handel panggilan balik permintaan tulis

Perubahan API jalur tulis yang baru memperkenalkan fitur eksperimental untuk menyediakan peta nilai> kunci dari metrik pasca-penulisan kepada klien. Kunci untuk metrik ditentukan dalam definisi Objek baru - Constants.FeedbackConstants. Metrik dapat diambil sebagai string JSON dengan meneruskan handel panggilan balik (Scala Function). Berikut adalah tanda tangan fungsi:

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

Berikut adalah beberapa metrik penting (disajikan dalam camel case):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

Berikut ini adalah sampel string JSON dengan metrik pasca-tulis:

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

Sampel kode lainnya

Menggunakan data materialisasi di seluruh sel

createOrReplaceTempView Spark DataFrame dapat digunakan untuk mengakses data yang diambil di sel lain dengan mendaftarkan tampilan sementara.

  • Sel tempat data diambil (katakanlah dengan preferensi bahasa Buku Catatan sebagai Scala)
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • Sekarang, ubah preferensi bahasa pada Buku Catatan menjadi PySpark (Python) dan ambil data dari tampilan terdaftar <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

Penanganan respons

Pemanggilan synapsesql memiliki dua kemungkinan status akhir - Status Berhasil atau Gagal. Bagian ini menjelaskan cara menangani respons permintaan untuk setiap skenario.

Respons permintaan baca

Setelah selesai, cuplikan respons baca ditampilkan dalam output sel. Kegagalan dalam sel saat ini juga akan membatalkan eksekusi sel berikutnya. Informasi kesalahan terperinci tersedia dalam Log Aplikasi Spark.

Respons permintaan tulis

Secara default, respons tulis dicetak ke output sel. Jika gagal, sel saat ini ditandai sebagai gagal, dan eksekusi sel berikutnya akan dibatalkan. Pendekatan lainnya adalah meneruskan opsi handel panggilan balik ke metode synapsesql. Handel panggilan balik akan menyediakan akses terprogram ke respons tulis.

Pertimbangan lain

  • Saat membaca dari tabel Kumpulan SQL Khusus Azure Synapse:
    • Pertimbangkan untuk menerapkan filter yang diperlukan pada DataFrame guna memanfaatkan fitur pemangkasan kolom Konektor.
    • Skenario baca tidak mendukung klausul TOP(n-rows) saat merangkum pernyataan kueri SELECT. Pilihan untuk membatasi data adalah menggunakan klausul batas(.) DataFrame.
  • Saat menulis ke tabel Kumpulan SQL Khusus Azure Synapse:
    • Untuk jenis tabel internal:
      • Tabel dibuat dengan distribusi data ROUND_ROBIN.
      • Jenis kolom disimpulkan dari DataFrame yang akan membaca data dari sumber. Kolom string dipetakan ke NVARCHAR(4000).
    • Untuk jenis tabel eksternal:
      • Paralelisme awal DataFrame mendorong organisasi data untuk tabel eksternal.
      • Jenis kolom disimpulkan dari DataFrame yang akan membaca data dari sumber.
    • Distribusi data yang lebih baik di seluruh pelaksana dapat dicapai dengan mengatur parameter spark.sql.files.maxPartitionBytes dan repartition DataFrame.
    • Saat menulis himpunan data besar, penting untuk memfaktorkan dampak pengaturan Tingkat Performa DWU yang membatasi ukuran transaksi.
  • Pantau tren pemanfaatan Azure Data Lake Storage Gen2 untuk menemukan perilaku pembatasan yang dapat memengaruhi performa baca dan tulis.

Referensi