Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Si applica a:SQL Server 2019 (15.x)
Important
I cluster Big Data di Microsoft SQL Server 2019 sono stati ritirati. Il supporto per i cluster Big Data di SQL Server 2019 è terminato a partire dal 28 febbraio 2025. Per altre informazioni, vedere il post di blog sull'annuncio e le opzioni per Big Data nella piattaforma Microsoft SQL Server.
Questa esercitazione illustra come usare i processi Spark per caricare i dati nel pool di dati di un cluster Big Data di SQL Server 2019.
In questa esercitazione si apprenderà come:
- Creare una tabella esterna nel pool di dati.
- Creare un processo Spark per caricare dati da HDFS.
- Interroga i risultati nella tabella esterna.
Tip
Se si preferisce, è possibile scaricare ed eseguire uno script per i comandi di questa esercitazione. Per istruzioni, vedere gli esempi di pool di dati in GitHub.
Prerequisites
-
Strumenti per Big Data
- kubectl
- Azure Data Studio
- Estensione SQL Server 2019
- Caricare dati di esempio nel cluster Big Data
Creare una tabella esterna nel pool di dati
La procedura seguente consente di creare una tabella esterna nel pool di dati denominato web_clickstreams_spark_results. Questa tabella può quindi essere usata come posizione per l'inserimento di dati nel cluster Big Data.
In Azure Data Studio, collegarsi all'istanza master di SQL Server del cluster di Big Data. Per altre informazioni, vedere Connettersi all'istanza master di SQL Server.
Fare doppio clic sulla connessione nella finestra Server per visualizzare il dashboard del server per l'istanza master di SQL Server. Selezionare Nuova query.
Creare le autorizzazioni per MSSQL-Spark Connector.
USE Sales CREATE LOGIN sample_user WITH PASSWORD ='<password>' CREATE USER sample_user FROM LOGIN sample_user -- To create external tables in data pools GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user; -- To create external tables GRANT CREATE TABLE TO sample_user; GRANT ALTER ANY SCHEMA TO sample_user; -- To view database state for Sales GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user ALTER ROLE [db_datawriter] ADD MEMBER sample_userCreare un'origine dati esterna nel pool di dati, se non esiste già.
USE Sales GO IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool') CREATE EXTERNAL DATA SOURCE SqlDataPool WITH (LOCATION = 'sqldatapool://controller-svc/default');Creare una tabella esterna denominata web_clickstreams_spark_results nel pool di dati.
USE Sales GO IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results') CREATE EXTERNAL TABLE [web_clickstreams_spark_results] ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT) WITH ( DATA_SOURCE = SqlDataPool, DISTRIBUTION = ROUND_ROBIN );Creare l'account di accesso per i pool di dati e fornire le autorizzazioni per l'utente.
EXECUTE( ' Use Sales; CREATE LOGIN sample_user WITH PASSWORD = ''<password>;'') AT DATA_SOURCE SqlDataPool; EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user; ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
La creazione di una tabella esterna del pool di dati è un'operazione di blocco. Il controllo restituisce quando la tabella specificata è stata creata in tutti i nodi del pool di dati back-end. Se si è verificato un errore durante l'operazione di creazione, viene restituito un messaggio di errore al chiamante.
Avviare un processo di streaming Spark
Il passaggio successivo consiste nel creare un processo di streaming Spark che carica i dati clickstream Web dal pool di archiviazione (HDFS) alla tabella esterna creata nel pool di dati. Questi dati sono stati aggiunti a /clickstream_data in Caricare dati di esempio nel cluster Big Data.
In Azure Data Studio, collegatevi all'istanza master del cluster di big data. Per altre informazioni, vedere Connettersi a un cluster Big Data.
Creare un nuovo notebook e selezionare Spark | Scala come kernel.
Eseguire il processo di inserimento Spark
- Configurare i parametri del connettore Spark-SQL
Note
Se il cluster Big Data viene distribuito con l'integrazione di Active Directory, sostituire il valore di nome host riportato di seguito per includere il nome di dominio completo aggiunto al nome del servizio. Ad esempio hostname=master-p-svc.<domainName>.
import org.apache.spark.sql.types._ import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame} // Change per your installation val user= "username" val password= "<password>" val database = "MyTestDatabase" val sourceDir = "/clickstream_data" val datapool_table = "web_clickstreams_spark_results" val datasource_name = "SqlDataPool" val schema = StructType(Seq( StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true), StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true) )) val hostname = "master-p-svc" val port = 1433 val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"- Definire ed eseguire il processo Spark
- Ogni processo ha due parti: readStream e writeStream. Di seguito, creiamo un data frame usando lo schema definito in precedenza, e quindi scriviamo nella tabella esterna nel pool di dati.
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame} val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir) val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => batchDF.write .format("com.microsoft.sqlserver.jdbc.spark") .mode("append") .option("url", url) .option("dbtable", datapool_table) .option("user", user) .option("password", password) .option("dataPoolDataSource",datasource_name).save() }.start() query.awaitTermination(40000) query.stop()
Eseguire una query sui dati
I passaggi seguenti mostrano che il processo di streaming Spark ha caricato i dati da HDFS nel pool di dati.
Prima di eseguire query sui dati inseriti, esaminare lo stato di esecuzione di Spark, inclusi l'ID app Yarn, l'interfaccia utente spark e i log dei driver. Queste informazioni verranno visualizzate nel notebook quando si avvia per la prima volta l'applicazione Spark.
Tornare alla finestra di query dell'istanza master di SQL Server aperta all'inizio di questa esercitazione.
Eseguire la query seguente per esaminare i dati inseriti.
USE Sales GO SELECT count(*) FROM [web_clickstreams_spark_results]; SELECT TOP 10 * FROM [web_clickstreams_spark_results];I dati possono anche essere sottoposti a query in Spark. Ad esempio, il codice seguente stampa il numero di record nella tabella:
def df_read(dbtable: String, url: String, dataPoolDataSource: String=""): DataFrame = { spark.read .format("com.microsoft.sqlserver.jdbc.spark") .option("url", url) .option("dbtable", dbtable) .option("user", user) .option("password", password) .option("dataPoolDataSource", dataPoolDataSource) .load() } val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name) println("Number of rows is " + new_df.count)
Clean up
Usare il comando seguente per rimuovere gli oggetti di database creati in questa esercitazione.
DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];
Next steps
Informazioni su come eseguire un notebook di esempio in Azure Data Studio: