Share via


Tutoriel : Ingérer des données dans un pool de données SQL Server avec des travaux Spark

S’applique à : SQL Server 2019 (15.x)

Important

Le module complémentaire Clusters Big Data Microsoft SQL Server 2019 sera mis hors service. La prise en charge de la plateforme Clusters Big Data Microsoft SQL Server 2019 se terminera le 28 février 2025. Tous les utilisateurs existants de SQL Server 2019 avec Software Assurance seront entièrement pris en charge sur la plateforme, et le logiciel continuera à être maintenu par les mises à jour cumulatives SQL Server jusqu’à ce moment-là. Pour plus d’informations, consultez le billet de blog d’annonce et les Options Big Data sur la plateforme Microsoft SQL Server.

Ce tutoriel montre comment utiliser des travaux Spark pour charger des données dans le pool de données d’un Clusters de Big Data SQL Server 2019.

Dans ce tutoriel, vous allez apprendre à :

  • Créer une table externe dans le pool de données
  • Créer un travail Spark pour charger des données à partir de HDFS
  • Interroger les résultats d’une table externe

Conseil

Si vous préférez, vous pouvez télécharger et exécuter un script pour les commandes de ce tutoriel. Pour obtenir des instructions, consultez les exemples de pools de données sur GitHub.

Prérequis

Créer une table externe dans le pool de données

Les étapes suivantes permettent de créer une table externe nommée web_clickstreams_spark_results dans le pool de données. Cette table peut ensuite être utilisée en tant qu’emplacement d’ingestion des données dans le cluster Big Data.

  1. Dans Azure Data Studio, connectez-vous à l’instance maître SQL Server de votre cluster Big Data. Pour plus d’informations, consultez Se connecter à l’instance maître SQL Server.

  2. Double-cliquez sur la connexion dans la fenêtre Serveurs pour afficher le tableau de bord de serveur de l’instance maître SQL Server. Sélectionnez Nouvelle requête.

    Requête d’instance maître SQL Server

  3. Créez des autorisations pour le connecteur MSSQL-Spark.

    USE Sales
    CREATE LOGIN sample_user  WITH PASSWORD ='password123!#' 
    CREATE USER sample_user FROM LOGIN sample_user
    
    -- To create external tables in data pools
    GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;
    
    -- To create external tables
    GRANT CREATE TABLE TO sample_user;
    GRANT ALTER ANY SCHEMA TO sample_user;
    
    -- To view database state for Sales
    GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;
    
    ALTER ROLE [db_datareader] ADD MEMBER sample_user
    ALTER ROLE [db_datawriter] ADD MEMBER sample_user
    
  4. Créez une source de données externe dans le pool de données, si elle n’existe pas déjà.

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
      CREATE EXTERNAL DATA SOURCE SqlDataPool
      WITH (LOCATION = 'sqldatapool://controller-svc/default');
    
  5. Créez une table externe nommée web_clickstreams_spark_results dans le pool de données.

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
       CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
       ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
       WITH
       (
          DATA_SOURCE = SqlDataPool,
          DISTRIBUTION = ROUND_ROBIN
       );
    
  6. Créez une connexion pour les pools de données et fournissez des autorisations à l’utilisateur.

    EXECUTE( ' Use Sales; CREATE LOGIN sample_user  WITH PASSWORD = ''password123!#'' ;') AT  DATA_SOURCE SqlDataPool;
    
    EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user;  ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
    

La création d’une table externe de pool de données est une opération bloquante. Le contrôle renvoie une valeur lorsque la table spécifiée a été créée sur tous les nœuds du pool de données principal. Si une défaillance s’est produite pendant l’opération de création, un message d’erreur est renvoyé à l’appelant.

Démarrer un travail de streaming Spark

L’étape suivante consiste à créer un travail de streaming Spark afin de charger des données de parcours web issues du pool de stockage (HDFS) dans la table externe que vous avez créée dans le pool de données. Ces données ont été ajoutées à /clickstream_data dans Charger des exemples de données dans votre cluster Big Data.

  1. Dans Azure Data Studio, connectez-vous à l’instance maître de votre cluster Big Data. Pour plus d’informations, consultez Se connecter à un cluster Big Data.

  2. Créez un notebook et sélectionnez Spark | Scala comme noyau.

  3. Exécuter le travail d’ingestion Spark

    1. Configurer les paramètres du connecteur Spark-SQL

    Notes

    Si votre cluster Big Data est déployé avec l’intégration Active Directory, remplacez la valeur de nom d’hôte ci-dessous pour inclure le nom de domaine complet ajouté au nom du service. Par ex., hostname=master-p-svc.<domainName>.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
    
    // Change per your installation
    val user= "username"
    val password= "****"
    val database =  "MyTestDatabase"
    val sourceDir = "/clickstream_data"
    val datapool_table = "web_clickstreams_spark_results"
    val datasource_name = "SqlDataPool"
    val schema = StructType(Seq(
    StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), 
    StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
    StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
    ))
    
    val hostname = "master-p-svc"
    val port = 1433
    val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
    
    1. Définir et exécuter le travail Spark
      • Chaque travail comporte deux parties : readStream et writeStream. Ci-dessous, nous créons une trame de données à l’aide du schéma défini ci-dessus, puis nous écrivons dans la table externe du pool de données.
      import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
      
      val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
      val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
                batchDF.write
                 .format("com.microsoft.sqlserver.jdbc.spark")
                 .mode("append")
                  .option("url", url)
                  .option("dbtable", datapool_table)
                  .option("user", user)
                  .option("password", password)
                  .option("dataPoolDataSource",datasource_name).save()
               }.start()
      
      query.awaitTermination(40000)
      query.stop()
      

Interroger les données

Les étapes suivantes montrent que le travail de streaming Spark a chargé les données HDFS dans le pool de données.

  1. Avant d’interroger les données ingérées, examinez l’état d’exécution Spark, y compris l’ID d’application Yarn, l’interface utilisateur Spark et les journaux de pilote. Ces informations s’affichent dans le notebook lors du démarrage initial de l’application Spark.

    Détails de l’exécution du travail

  2. Revenez à la fenêtre de la requête de l’instance maître SQL Server que vous avez ouverte au début de ce tutoriel.

  3. Exécutez la requête suivante pour vérifier les données ingérées.

    USE Sales
    GO
    SELECT count(*) FROM [web_clickstreams_spark_results];
    SELECT TOP 10 * FROM [web_clickstreams_spark_results];
    
  4. Les données peuvent également être interrogées dans Spark. Par exemple, le code ci-dessous imprime le nombre d’enregistrements dans la table :

    def df_read(dbtable: String,
                 url: String,
                 dataPoolDataSource: String=""): DataFrame = {
         spark.read
              .format("com.microsoft.sqlserver.jdbc.spark")
              .option("url", url)
              .option("dbtable", dbtable)
              .option("user", user)
              .option("password", password)
              .option("dataPoolDataSource", dataPoolDataSource)
              .load()
              }
    
    val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name)
    println("Number of rows is " +  new_df.count)
    

Nettoyer

Utilisez la commande suivante pour supprimer les objets de base de données créés dans ce tutoriel.

DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];

Étapes suivantes

En savoir plus sur l’exécution d’un exemple de notebook dans Azure Data Studio :