適用於 Apache Spark 的 Azure Synapse 專用 SQL 集區連接器

簡介

Azure Synapse Dedicated SQL Pool 連線or for Apache Spark in Azure Synapse Analytics 可讓您有效率地傳輸 Apache Spark 執行時間和 專用 SQL 集 區之間的 大型資料集。 連接器會透過 Azure Synapse 工作區以預設程式庫的形式運送。 連接器是使用 Scala 語言實作。 連接器支援 Scala 和 Python。 若要搭配其他筆記本語言選項使用 連線or,請使用 Spark magic 命令 - %%spark

在高層級,連接器提供下列功能:

  • 從 Azure Synapse 專用 SQL 集區讀取:
    • 從 Synapse 專用 SQL 集區資料表 (內部和外部) 和檢視讀取大型資料集。
    • 完整的述詞下推支援,其中 DataFrame 上的篩選會對應至對應的 SQL 述詞下推。
    • 支援資料行剪除。
    • 支援向下推入查詢。
  • 寫入 Azure Synapse 專用 SQL 集區:
    • 將大型磁片區資料內嵌至內部和外部資料表類型。
    • 支援下列 DataFrame 儲存模式喜好設定:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • 寫入外部資料表類型支援 Parquet 和分隔文字檔案格式 (範例 - CSV)。
    • 若要將資料寫入內部資料表,連接器現在會使用 COPY 語句 ,而不是 CETAS/CTAS 方法。
    • 優化端對端寫入輸送量效能的增強功能。
    • 引進選擇性的回呼控制碼(Scala 函式引數),用戶端可用來接收寫入後計量。
      • 少數範例包括 - 記錄數目、完成特定動作的持續時間,以及失敗原因。

協調流程方法

讀取

A high-level data flow diagram to describe the connector's orchestration of a read request.

寫入

A high-level data flow diagram to describe the connector's orchestration of a write request.

必要條件

本節將討論設定必要 Azure 資源及設定必要條件的步驟。

Azure 資源

檢閱並設定下列相依的 Azure 資源:

準備資料庫

連線至 Synapse 專用 SQL 集區資料庫,然後執行下列安裝程式語句:

  • 建立對應至用來登入 Azure Synapse 工作區的 Microsoft Entra 使用者身分識別的資料庫使用者。

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • 建立將定義資料表的架構,讓連線器可以成功寫入和讀取個別資料表。

    CREATE SCHEMA [<schema_name>];
    

驗證

Microsoft Entra ID 型驗證

Microsoft Entra ID 型驗證是整合式驗證方法。 使用者必須成功登入 Azure Synapse Analytics 工作區。

基本驗證

基本驗證方法需要使用者設定 usernamepassword 選項。 請參閱 - 組態選項 ,以瞭解相關組態參數,以讀取和寫入 Azure Synapse 專用 SQL 集區中的資料表。

授權

Azure Data Lake Storage Gen2 \(部分機器翻譯\)

有兩種方式可將存取權限授與 Azure Data Lake 儲存體 Gen2 - 儲存體 帳戶:

  • 角色型存取控制角色 - 儲存體 Blob 資料參與者角色
    • 指派 授 Storage Blob Data Contributor Role 與使用者讀取、寫入和刪除Azure 儲存體 Blob 容器的許可權。
    • RBAC 在容器層級提供粗略的控制方法。
  • 存取控制清單 (ACL)
    • ACL 方法允許對指定資料夾下的特定路徑和/或檔案進行更細緻的控制。
    • 如果使用者已使用 RBAC 方法授與許可權,則不會強制執行 ACL 檢查。
    • ACL 許可權有兩種廣泛的類型:
      • 存取權限(套用在特定層級或物件上)。
      • 預設許可權(在建立時自動套用到所有子物件)。
    • 許可權類型包括:
      • Execute 可讓您周遊或流覽資料夾階層。
      • Read 可讓您讀取。
      • Write 可讓您撰寫。
    • 請務必設定 ACL,讓連線器能夠成功從儲存體位置寫入和讀取。

注意

  • 如果您想要使用 Synapse 工作區管線執行筆記本,您也必須將上述列出的存取權限授與 Synapse Workspace 預設受控識別。 工作區的預設受控識別名稱與工作區的名稱相同。

  • 若要使用 Synapse 工作區搭配安全的儲存體帳戶,必須 從筆記本設定受控私人端點。 受控私人端點必須從窗格中的 ADLS Gen2 儲存體帳戶區 Private endpoint connectionsNetworking 核准。

Azure Synapse 專用 SQL 集區

若要啟用與 Azure Synapse 專用 SQL 集區的成功互動,除非您是使用者也設定為 Active Directory Admin 專用 SQL 端點上的 ,否則需要下列授權:

  • 讀取案例

    • 使用系統預存程式 sp_addrolemember 授與使用者 db_exporter

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • 寫入案例

    • 連線or 會使用 COPY 命令將資料從暫存寫入內部資料表的受控位置。
      • 設定這裡 所述的 必要許可權。

      • 以下是相同專案的快速存取程式碼片段:

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

API 文件

適用于 Apache Spark 的 Azure Synapse 專用 SQL 集區連線or - API 檔。

設定選項

若要成功啟動程式並協調讀取或寫入作業,連線or 需要特定組態參數。 物件定義 - com.microsoft.spark.sqlanalytics.utils.Constants 提供每個參數索引鍵的標準化常數清單。

以下是根據使用案例的組態選項清單:

  • 使用 Microsoft Entra ID 型驗證讀取
    • 認證會自動對應,而且使用者不需要提供特定的組態選項。
    • 方法上的 synapsesql 三部分資料表名稱引數必須從 Azure Synapse 專用 SQL 集區中的個別資料表讀取。
  • 使用基本驗證讀取
    • Azure Synapse 專用 SQL 端點
      • Constants.SERVER - Synapse 專用 SQL 集區端點 (伺服器 FQDN)
      • Constants.USER - SQL 使用者名稱。
      • Constants.PASSWORD - SQL 使用者密碼。
    • Azure Data Lake 儲存體 (Gen 2) 端點 - 預備資料夾
      • Constants.DATA_SOURCE- 儲存體資料來源位置參數上設定的路徑用於資料暫存。
  • 使用 Microsoft Entra ID 型驗證撰寫
    • Azure Synapse 專用 SQL 端點
      • 根據預設,連線程式會使用方法三部分資料表名稱參數上 synapsesql 設定的資料庫名稱來推斷 Synapse Dedicated SQL 端點。
      • 或者,使用者可以使用 Constants.SERVER 選項來指定 sql 端點。 確定端點裝載具有個別架構的對應資料庫。
    • Azure Data Lake 儲存體 (Gen 2) 端點 - 預備資料夾
      • 針對內部資料表類型:
        • Constants.TEMP_FOLDER設定 或 Constants.DATA_SOURCE 選項。
        • 如果使用者選擇提供 Constants.DATA_SOURCE 選項,預備資料夾會使用 location DataSource 的值來衍生。
        • 如果兩者都提供, Constants.TEMP_FOLDER 則會使用選項值。
        • 如果沒有預備資料夾選項,連線器會根據執行時間組態衍生一個 - spark.sqlanalyticsconnector.stagingdir.prefix
      • 針對外部資料表類型:
        • Constants.DATA_SOURCE 是必要的組態選項。
        • 連接器會使用資料來源位置參數上設定的儲存路徑,結合 location 方法的引數 synapsesql ,並衍生絕對路徑來保存外部資料表資料。
        • 如果未指定方法的 locationsynapsesql 引數,則連接器會衍生位置值做為 <base_path>/dbName/schemaName/tableName
  • 使用基本驗證撰寫
    • Azure Synapse 專用 SQL 端點
      • Constants.SERVER - Synapse 專用 SQL 集區端點(伺服器 FQDN)。
      • Constants.USER - SQL 使用者名稱。
      • Constants.PASSWORD - SQL 使用者密碼。
      • Constants.STAGING_STORAGE_ACCOUNT_KEY與主控 Constants.TEMP_FOLDERS 的儲存體帳戶相關聯(僅限內部資料表類型)或 Constants.DATA_SOURCE
    • Azure Data Lake 儲存體 (Gen 2) 端點 - 預備資料夾
      • SQL 基本驗證認證不適用於存取儲存體端點。
      • 因此,請確定指派相關的儲存體存取權限,如 Azure Data Lake 儲存體 Gen2 一節 所述。

程式碼範本

本節提供參考程式碼範本,說明如何使用和叫用適用于 Apache Spark 的 Azure Synapse 專用 SQL 集區連線或。

注意

在 Python 中使用 連線or-

  • 只有適用于 Spark 3 的 Python 支援連接器。 針對 Spark 2.4(不支援), 我們可以使用 Scala 連接器 API,透過使用 DataFrame.createOrReplaceTempView 或 DataFrame.createOrReplaceGlobalTempView 與 PySpark 中 DataFrame 的內容互動。 請參閱章節 - 跨資料格 使用具體化資料。
  • 在 Python 中無法使用回呼控制碼。

從 Azure Synapse 專用 SQL 集區讀取

讀取要求 - synapsesql 方法簽章

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

使用 Microsoft Entra ID 型驗證從資料表讀取

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

使用 Microsoft Entra ID 型驗證從查詢讀取

注意

從查詢讀取時的限制:

  • 無法同時指定資料表名稱和查詢。
  • 只允許選取查詢。 不允許 DDL 和 DML SQL。
  • 指定查詢時,資料框架上的選取和篩選選項不會向下推送至 SQL 專用集區。
  • 從查詢讀取僅適用于 Spark 3.1 和 3.2。
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

使用基本驗證從資料表讀取

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

使用基本驗證從查詢讀取

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

寫入 Azure Synapse 專用 SQL 集區

寫入要求 - synapsesql 方法簽章

針對 Spark 2.4.8 建置之連線or 版本的方法簽章,其引數比套用至 Spark 3.1.2 版本少一個。 以下是兩個方法簽章:

  • Spark 集區 2.4.8 版
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None):Unit
  • Spark 集區 3.1.2 版
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

使用 Microsoft Entra ID 型驗證撰寫

以下是一個完整的程式碼範本,描述如何使用 連線or 進行寫入案例:

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

使用基本驗證撰寫

下列程式碼片段會取代使用 Microsoft Entra ID 型驗證 撰寫一節中所述 的寫入定義,以使用 SQL 基本驗證方法提交寫入要求:

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

在基本驗證方法中,若要從來源儲存體路徑讀取資料,則需要其他組態選項。 下列程式碼片段提供使用服務主體認證從 Azure Data Lake 儲存體 Gen2 資料來源讀取的範例:

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

支援的 DataFrame 儲存模式

將來源資料寫入 Azure Synapse 專用 SQL 集區中的目的地資料表時,支援下列儲存模式:

  • ErrorIfExists (預設儲存模式)
    • 如果目的地資料表存在,則寫入會中止,並傳回給被呼叫端的例外狀況。 否則,系統會使用來自暫存資料夾的資料來建立新的資料表。
  • 忽略
    • 如果目的地資料表存在,則寫入會忽略寫入要求,而不會傳回錯誤。 否則,系統會使用來自暫存資料夾的資料來建立新的資料表。
  • 覆蓋
    • 如果目的地資料表存在,則目的地中的現有資料會取代為預備資料夾中的資料。 否則,系統會使用來自暫存資料夾的資料來建立新的資料表。
  • 附加
    • 如果目的地資料表存在,則會將新資料附加至該資料表。 否則,系統會使用來自暫存資料夾的資料來建立新的資料表。

寫入要求回呼控制碼

新的寫入路徑 API 變更引進了實驗性功能,以提供用戶端寫入後計量的索引鍵/ > 值對應。 計量的索引鍵是在新的物件定義中定義 - Constants.FeedbackConstants 。 您可以藉由傳入回呼控制碼 (a Scala Function ) 來擷取計量作為 JSON 字串。 以下是函式簽章:

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

以下是一些值得注意的計量(在駱駝案例中顯示):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

以下是具有寫入後計量的範例 JSON 字串:

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

其他程式碼範例

跨資料格使用具體化資料

Spark DataFrame createOrReplaceTempView 可以藉由註冊暫存檢視,來存取另一個資料格中擷取的資料。

  • 擷取資料的資料格(例如,以筆記本語言喜好設定為 Scala
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • 現在,將 Notebook 上的語言喜好設定變更為 PySpark (Python) ,並從已註冊的檢視擷取資料 <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

回應處理

叫用 synapsesql 有兩個可能的結束狀態 - 成功或失敗狀態。 本節說明如何處理每個案例的要求回應。

讀取要求回應

完成時,讀取回應程式碼片段會顯示在儲存格的輸出中。 目前儲存格中的失敗也會取消後續的儲存格執行。 Spark 應用程式記錄中提供詳細的錯誤資訊。

寫入要求回應

根據預設,寫入回應會列印至資料格輸出。 失敗時,目前的儲存格標示為失敗,後續的儲存格執行將會中止。 另一種方法是將 回呼控制碼 選項傳遞至 synapsesql 方法。 回呼控制碼會以程式設計方式存取寫入回應。

其他考量

  • 從 Azure Synapse 專用 SQL 集區資料表讀取時:
    • 請考慮在 DataFrame 上套用必要的篩選,以利用連線或資料行剪除功能。
    • 讀取案例不支援 TOP(n-rows) 子句,在框架 SELECT 查詢語句時。 限制資料的選擇是使用 DataFrame 的 limit(.) 子句。
  • 寫入 Azure Synapse 專用 SQL 集區資料表時:
    • 針對內部資料表類型:
      • 資料表會使用ROUND_ROBIN資料散發來建立。
      • 資料行類型是從從來源讀取資料的 DataFrame 推斷而來。 字串資料行會對應至 NVARCHAR(4000)
    • 針對外部資料表類型:
      • DataFrame 的初始平行處理原則會驅動外部資料表的資料組織。
      • 資料行類型是從從來源讀取資料的 DataFrame 推斷而來。
    • 藉由調整 spark.sql.files.maxPartitionBytes 和 DataFrame 的參數 repartition ,即可達成跨執行程式更佳的資料散發。
    • 寫入大型資料集時,請務必考慮限制交易大小的 DWU 效能等級 設定的影響
  • 監視 Azure Data Lake 儲存體 Gen2 使用率趨勢,以找出可能會影響 讀取和寫入效能的節流行為。

參考資料