Esercitazione: inserire dati in un pool di dati di SQL Server con processi Spark

Si applica a: SQL Server 2019 (15.x)

Importante

Il componente aggiuntivo per i cluster Big Data di Microsoft SQL Server 2019 verrà ritirato. Il supporto per i cluster Big Data di SQL Server 2019 terminerà il 28 febbraio 2025. Tutti gli utenti esistenti di SQL Server 2019 con Software Assurance saranno completamente supportati nella piattaforma e fino a quel momento il software continuerà a ricevere aggiornamenti cumulativi di SQL Server. Per altre informazioni, vedere il post di blog relativo all'annuncio e Opzioni per i Big Data nella piattaforma Microsoft SQL Server.

Questa esercitazione illustra come usare processi Spark per caricare dati nel pool di dati di un cluster Big Data di SQL Server 2019.

In questa esercitazione apprenderai a:

  • Creare una tabella esterna nel pool di dati.
  • Creare un processo Spark mediante il quale caricare i dati da HDFS.
  • Eseguire una query sui risultati nella tabella esterna.

Suggerimento

Se si preferisce, è possibile scaricare ed eseguire uno script per i comandi descritti in questa esercitazione. Per istruzioni, vedere i pool di dati di esempio in GitHub.

Prerequisiti

Creare una tabella esterna nel pool di dati

Questa procedura consente di creare una tabella esterna nel pool di dati denominata web_clickstreams_spark_results. La tabella può essere quindi usata come posizione per l'inserimento di dati nel cluster Big Data.

  1. In Azure Data Studio connettersi all'istanza master di SQL Server del cluster Big Data. Per altre informazioni, vedere Connettersi all'istanza master di SQL Server.

  2. Fare doppio clic sulla connessione nella finestra Server per visualizzare il dashboard del server per l'istanza master di SQL Server. Selezionare Nuova query.

    SQL Server master instance query

  3. Creare autorizzazioni per il connettore MSSQL-Spark.

    USE Sales
    CREATE LOGIN sample_user  WITH PASSWORD ='password123!#' 
    CREATE USER sample_user FROM LOGIN sample_user
    
    -- To create external tables in data pools
    GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;
    
    -- To create external tables
    GRANT CREATE TABLE TO sample_user;
    GRANT ALTER ANY SCHEMA TO sample_user;
    
    -- To view database state for Sales
    GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;
    
    ALTER ROLE [db_datareader] ADD MEMBER sample_user
    ALTER ROLE [db_datawriter] ADD MEMBER sample_user
    
  4. Creare un'origine dati esterna nel pool di dati, se non esiste già.

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
      CREATE EXTERNAL DATA SOURCE SqlDataPool
      WITH (LOCATION = 'sqldatapool://controller-svc/default');
    
  5. Creare una tabella esterna denominata web_clickstreams_spark_results nel pool di dati.

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
       CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
       ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
       WITH
       (
          DATA_SOURCE = SqlDataPool,
          DISTRIBUTION = ROUND_ROBIN
       );
    
  6. Creare l'account di accesso per i pool di dati e fornire le autorizzazioni all'utente.

    EXECUTE( ' Use Sales; CREATE LOGIN sample_user  WITH PASSWORD = ''password123!#'' ;') AT  DATA_SOURCE SqlDataPool;
    
    EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user;  ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
    

La creazione di una tabella esterna del pool di dati è un'operazione di blocco. Il controllo viene restituito dopo aver completato la creazione della tabella specificata in tutti i nodi del pool di dati back-end. Se si verifica un errore durante l'operazione di creazione, viene restituito un messaggio di errore al chiamante.

Avviare un processo di streaming Spark

Il passaggio successivo consiste nella creazione di un processo di streaming Spark che carica dati clickstream Web dal pool di archiviazione (HDFS) nella tabella esterna creata nel pool di dati. Questi dati sono stati aggiunti a /clickstream_data in Caricare dati di esempio nel cluster Big Data.

  1. In Azure Data Studio connettersi all'istanza master del cluster Big Data. Per altre informazioni, vedere Connettersi a un cluster Big Data.

  2. Creare un nuovo notebook e selezionare Spark | Scala come kernel.

  3. Eseguire il processo di inserimento Spark

    1. Configurare i parametri del connettore Spark-SQL

    Nota

    Se il cluster Big Data viene distribuito con l'integrazione di Active Directory, sostituire il valore di nome host riportato di seguito in modo da includere il nome di dominio completo aggiunto al nome del servizio. Ad esempio, hostname=master-p-svc.<NomeDominio>.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
    
    // Change per your installation
    val user= "username"
    val password= "****"
    val database =  "MyTestDatabase"
    val sourceDir = "/clickstream_data"
    val datapool_table = "web_clickstreams_spark_results"
    val datasource_name = "SqlDataPool"
    val schema = StructType(Seq(
    StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), 
    StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
    StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
    ))
    
    val hostname = "master-p-svc"
    val port = 1433
    val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
    
    1. Definire ed eseguire il processo Spark
      • Ogni processo è costituito da due parti: readStream e writeStream. Di seguito viene creato un frame di dati usando lo schema definito in precedenza e quindi viene scritto nella tabella esterna nel pool di dati.
      import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
      
      val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
      val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
                batchDF.write
                 .format("com.microsoft.sqlserver.jdbc.spark")
                 .mode("append")
                  .option("url", url)
                  .option("dbtable", datapool_table)
                  .option("user", user)
                  .option("password", password)
                  .option("dataPoolDataSource",datasource_name).save()
               }.start()
      
      query.awaitTermination(40000)
      query.stop()
      

Eseguire una query sui dati

Questa procedura dimostra che il processo di streaming Spark ha caricato i dati da HDFS nel pool di dati.

  1. Prima di eseguire query sui dati inseriti, esaminare lo stato di esecuzione di Spark, inclusi l'ID dell'app Yarn, l'interfaccia utente di Spark e i log del driver. Queste informazioni verranno visualizzate nel notebook al primo avvio dell'applicazione Spark.

    Spark Execution Details

  2. Tornare alla finestra di query dell'istanza master di SQL Server aperta all'inizio di questa esercitazione.

  3. Eseguire la query seguente per esaminare i dati inseriti.

    USE Sales
    GO
    SELECT count(*) FROM [web_clickstreams_spark_results];
    SELECT TOP 10 * FROM [web_clickstreams_spark_results];
    
  4. È anche possibile eseguire query sui dati in Spark. Il codice seguente, ad esempio, stampa il numero di record nella tabella:

    def df_read(dbtable: String,
                 url: String,
                 dataPoolDataSource: String=""): DataFrame = {
         spark.read
              .format("com.microsoft.sqlserver.jdbc.spark")
              .option("url", url)
              .option("dbtable", dbtable)
              .option("user", user)
              .option("password", password)
              .option("dataPoolDataSource", dataPoolDataSource)
              .load()
              }
    
    val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name)
    println("Number of rows is " +  new_df.count)
    

Eseguire la pulizia

Usare il comando seguente per rimuovere gli oggetti di database creati in questa esercitazione.

DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];

Passaggi successivi

Informazioni su come eseguire un notebook di esempio in Azure Data Studio: