Eventi
31 mar, 23 - 2 apr, 23
Il più grande evento di apprendimento di SQL, Infrastruttura e Power BI. 31 marzo - 2 aprile. Usare il codice FABINSIDER per salvare $400.
Iscriviti oggi stessoQuesto browser non è più supportato.
Esegui l'aggiornamento a Microsoft Edge per sfruttare i vantaggi di funzionalità più recenti, aggiornamenti della sicurezza e supporto tecnico.
Si applica a: SQL Server 2019 (15.x)
Importante
Il componente aggiuntivo per i cluster Big Data di Microsoft SQL Server 2019 verrà ritirato. Il supporto per i cluster Big Data di SQL Server 2019 terminerà il 28 febbraio 2025. Tutti gli utenti esistenti di SQL Server 2019 con Software Assurance saranno completamente supportati nella piattaforma e fino a quel momento il software continuerà a ricevere aggiornamenti cumulativi di SQL Server. Per altre informazioni, vedere il post di blog relativo all'annuncio e Opzioni per i Big Data nella piattaforma Microsoft SQL Server.
Questa esercitazione illustra come usare processi Spark per caricare dati nel pool di dati di un cluster Big Data di SQL Server 2019.
In questa esercitazione apprenderai a:
Suggerimento
Se si preferisce, è possibile scaricare ed eseguire uno script per i comandi descritti in questa esercitazione. Per istruzioni, vedere i pool di dati di esempio in GitHub.
Questa procedura consente di creare una tabella esterna nel pool di dati denominata web_clickstreams_spark_results. La tabella può essere quindi usata come posizione per l'inserimento di dati nel cluster Big Data.
In Azure Data Studio connettersi all'istanza master di SQL Server del cluster Big Data. Per altre informazioni, vedere Connettersi all'istanza master di SQL Server.
Fare doppio clic sulla connessione nella finestra Server per visualizzare il dashboard del server per l'istanza master di SQL Server. Selezionare Nuova query.
Creare autorizzazioni per il connettore MSSQL-Spark.
USE Sales
CREATE LOGIN sample_user WITH PASSWORD ='<password>'
CREATE USER sample_user FROM LOGIN sample_user
-- To create external tables in data pools
GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;
-- To create external tables
GRANT CREATE TABLE TO sample_user;
GRANT ALTER ANY SCHEMA TO sample_user;
-- To view database state for Sales
GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;
ALTER ROLE [db_datareader] ADD MEMBER sample_user
ALTER ROLE [db_datawriter] ADD MEMBER sample_user
Creare un'origine dati esterna nel pool di dati, se non esiste già.
USE Sales
GO
IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
CREATE EXTERNAL DATA SOURCE SqlDataPool
WITH (LOCATION = 'sqldatapool://controller-svc/default');
Creare una tabella esterna denominata web_clickstreams_spark_results nel pool di dati.
USE Sales
GO
IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
WITH
(
DATA_SOURCE = SqlDataPool,
DISTRIBUTION = ROUND_ROBIN
);
Creare l'account di accesso per i pool di dati e fornire le autorizzazioni all'utente.
EXECUTE( ' Use Sales; CREATE LOGIN sample_user WITH PASSWORD = ''<password>;'') AT DATA_SOURCE SqlDataPool;
EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user; ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
La creazione di una tabella esterna del pool di dati è un'operazione di blocco. Il controllo viene restituito dopo aver completato la creazione della tabella specificata in tutti i nodi del pool di dati back-end. Se si verifica un errore durante l'operazione di creazione, viene restituito un messaggio di errore al chiamante.
Il passaggio successivo consiste nella creazione di un processo di streaming Spark che carica dati clickstream Web dal pool di archiviazione (HDFS) nella tabella esterna creata nel pool di dati. Questi dati sono stati aggiunti a /clickstream_data in Caricare dati di esempio nel cluster Big Data.
In Azure Data Studio connettersi all'istanza master del cluster Big Data. Per altre informazioni, vedere Connettersi a un cluster Big Data.
Creare un nuovo notebook e selezionare Spark | Scala come kernel.
Eseguire il processo di inserimento Spark
Nota
Se il cluster Big Data viene distribuito con l'integrazione di Active Directory, sostituire il valore di nome host riportato di seguito in modo da includere il nome di dominio completo aggiunto al nome del servizio. Ad esempio, hostname=master-p-svc.<NomeDominio>.
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
// Change per your installation
val user= "username"
val password= "<password>"
val database = "MyTestDatabase"
val sourceDir = "/clickstream_data"
val datapool_table = "web_clickstreams_spark_results"
val datasource_name = "SqlDataPool"
val schema = StructType(Seq(
StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true),
StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
))
val hostname = "master-p-svc"
val port = 1433
val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("com.microsoft.sqlserver.jdbc.spark")
.mode("append")
.option("url", url)
.option("dbtable", datapool_table)
.option("user", user)
.option("password", password)
.option("dataPoolDataSource",datasource_name).save()
}.start()
query.awaitTermination(40000)
query.stop()
Questa procedura dimostra che il processo di streaming Spark ha caricato i dati da HDFS nel pool di dati.
Prima di eseguire query sui dati inseriti, esaminare lo stato di esecuzione di Spark, inclusi l'ID dell'app Yarn, l'interfaccia utente di Spark e i log del driver. Queste informazioni verranno visualizzate nel notebook al primo avvio dell'applicazione Spark.
Tornare alla finestra di query dell'istanza master di SQL Server aperta all'inizio di questa esercitazione.
Eseguire la query seguente per esaminare i dati inseriti.
USE Sales
GO
SELECT count(*) FROM [web_clickstreams_spark_results];
SELECT TOP 10 * FROM [web_clickstreams_spark_results];
È anche possibile eseguire query sui dati in Spark. Il codice seguente, ad esempio, stampa il numero di record nella tabella:
def df_read(dbtable: String,
url: String,
dataPoolDataSource: String=""): DataFrame = {
spark.read
.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", url)
.option("dbtable", dbtable)
.option("user", user)
.option("password", password)
.option("dataPoolDataSource", dataPoolDataSource)
.load()
}
val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name)
println("Number of rows is " + new_df.count)
Usare il comando seguente per rimuovere gli oggetti di database creati in questa esercitazione.
DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];
Informazioni su come eseguire un notebook di esempio in Azure Data Studio:
Eventi
31 mar, 23 - 2 apr, 23
Il più grande evento di apprendimento di SQL, Infrastruttura e Power BI. 31 marzo - 2 aprile. Usare il codice FABINSIDER per salvare $400.
Iscriviti oggi stessoFormazione
Modulo
Integrare pool SQL e di Apache Spark in Azure Synapse Analytics - Training
Integrare pool SQL e di Apache Spark in Azure Synapse Analytics
Certificazione
Microsoft Certified: Azure Data Engineer Associate - Certifications
Dimostrare la comprensione delle attività comuni di ingegneria dei dati per implementare e gestire carichi di lavoro di ingegneria dei dati in Microsoft Azure, usando vari servizi di Azure.