教學課程:使用 Spark 作業將資料內嵌至 SQL Server 資料集區

適用於:SQL Server 2019 (15.x)

重要

Microsoft SQL Server 2019 巨量資料叢集附加元件將會淘汰。 SQL Server 2019 巨量資料叢集的支援將於 2025 年 2 月 28 日結束。 平台上將完全支援含軟體保證 SQL Server 2019 的所有現有使用者,而且軟體將會持續透過 SQL Server 累積更新來維護,直到該時間為止。 如需詳細資訊,請參閱公告部落格文章Microsoft SQL Server 平台上的巨量資料選項

本教學課程示範如何使用 Spark 作業,將資料載入 SQL Server 2019 巨量資料叢集的資料集區

在本教學課程中,您會了解如何:

  • 在資料集區中建立外部資料表。
  • 建立 Spark 作業,以便從 HDFS 載入資料。
  • 查詢外部資料表中的結果。

提示

如果您想要的話,也可以下載並執行用於本教學課程中命令的指令碼。 如需指示,請參閱 GitHub 上的資料集區範例

必要條件

在資料集區中建立外部資料表

下列步驟會在資料集區中建立名為 web_clickstreams_spark_results 的外部資料表。 然後,此資料表可作為資料內嵌至巨量資料叢集的位置。

  1. 在 Azure Data Studio 中,連線到巨量資料叢集的 SQL Server 主要執行個體。 如需詳細資訊,請參閱連線到 SQL Server 主要執行個體

  2. 按兩下 [伺服器] 視窗中的連線,顯示 SQL Server 主要執行個體的伺服器儀表板。 選取 [新增查詢]。

    SQL Server master instance query

  3. 建立 MSSQL-Spark 連接器的權限。

    USE Sales
    CREATE LOGIN sample_user  WITH PASSWORD ='password123!#' 
    CREATE USER sample_user FROM LOGIN sample_user
    
    -- To create external tables in data pools
    GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;
    
    -- To create external tables
    GRANT CREATE TABLE TO sample_user;
    GRANT ALTER ANY SCHEMA TO sample_user;
    
    -- To view database state for Sales
    GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;
    
    ALTER ROLE [db_datareader] ADD MEMBER sample_user
    ALTER ROLE [db_datawriter] ADD MEMBER sample_user
    
  4. 如果資料集區沒有外部資料來源,請加以建立。

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
      CREATE EXTERNAL DATA SOURCE SqlDataPool
      WITH (LOCATION = 'sqldatapool://controller-svc/default');
    
  5. 在資料集區中建立名為 web_clickstreams_spark_results 的外部資料表。

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
       CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
       ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
       WITH
       (
          DATA_SOURCE = SqlDataPool,
          DISTRIBUTION = ROUND_ROBIN
       );
    
  6. 建立資料集區的登入,並提供權限給使用者。

    EXECUTE( ' Use Sales; CREATE LOGIN sample_user  WITH PASSWORD = ''password123!#'' ;') AT  DATA_SOURCE SqlDataPool;
    
    EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user;  ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
    

資料集區外部資料表的建立是封鎖作業。 當指定的資料表已經在所有後端資料集區節點上建立之後,就會恢復控制權。 如果建立作業期間發生失敗,錯誤訊息會傳回給呼叫者。

啟動 Spark 串流作業

下一個步驟是建立 Spark 串流作業,將來自存放集區 (HDFS) 的 Web 點選流資料載入您在資料集區中建立的外部資料表。 這項資料已在將範例資料載入您的巨量資料叢集中新增至 /clickstream_data。

  1. 在 Azure Data Studio 中,連線到巨量資料叢集的主要執行個體。 如需詳細資訊,請參閱連線到巨量資料叢集

  2. 建立新的筆記本並選取 Spark | Scala 作為您的核心。

  3. 執行 Spark 擷取作業

    1. 設定 Spark-SQL 連接器參數

    注意

    如果使用 Active Directory 整合來部署巨量資料叢集,請取代下列 hostname 值,以包含附加至服務名稱的 FQDN。 例如,hostname=master-p-svc.<domainName>

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
    
    // Change per your installation
    val user= "username"
    val password= "****"
    val database =  "MyTestDatabase"
    val sourceDir = "/clickstream_data"
    val datapool_table = "web_clickstreams_spark_results"
    val datasource_name = "SqlDataPool"
    val schema = StructType(Seq(
    StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), 
    StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
    StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
    ))
    
    val hostname = "master-p-svc"
    val port = 1433
    val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
    
    1. 定義並執行 Spark 作業
      • 每個作業都有兩個部分:readStream 和 writeStream。 以下我們會使用以上定義的結構描述來建立資料框架,然後寫入資料集區中的外部資料表。
      import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
      
      val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
      val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
                batchDF.write
                 .format("com.microsoft.sqlserver.jdbc.spark")
                 .mode("append")
                  .option("url", url)
                  .option("dbtable", datapool_table)
                  .option("user", user)
                  .option("password", password)
                  .option("dataPoolDataSource",datasource_name).save()
               }.start()
      
      query.awaitTermination(40000)
      query.stop()
      

查詢資料

下列步驟顯示 Spark 串流作業已將來自 HDFS 的資料載入資料集區。

  1. 在查詢內嵌資料前,請查看 Spark 執行狀態 (包括 Yarn 應用程式識別碼、Spark UI 和驅動程式記錄)。 當您第一次啟動 Spark 應用程式時,此資訊會顯示在筆記本中。

    Spark Execution Details

  2. 返回您在本教學課程開頭開啟的 SQL Server 主要執行個體查詢視窗。

  3. 執行下列查詢,檢查內嵌資料。

    USE Sales
    GO
    SELECT count(*) FROM [web_clickstreams_spark_results];
    SELECT TOP 10 * FROM [web_clickstreams_spark_results];
    
  4. 您也可以在 Spark 中查詢資料。 例如,以下程式碼會印出資料表中的記錄數:

    def df_read(dbtable: String,
                 url: String,
                 dataPoolDataSource: String=""): DataFrame = {
         spark.read
              .format("com.microsoft.sqlserver.jdbc.spark")
              .option("url", url)
              .option("dbtable", dbtable)
              .option("user", user)
              .option("password", password)
              .option("dataPoolDataSource", dataPoolDataSource)
              .load()
              }
    
    val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name)
    println("Number of rows is " +  new_df.count)
    

清除

使用下列命令,移除本教學課程所建立的資料庫物件。

DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];

後續步驟

了解如何在 Azure Data Studio 中執行範例筆記本: