Connettore Apache Spark: SQL Server e Azure SQL
Il connettore Apache Spark per SQL Server e Azure SQL è un connettore ad alte prestazioni che consente di usare i dati transazionali in analisi dei Big Data e di salvare i risultati per la creazione di query o report ad hoc. Il connettore consente di usare qualsiasi database SQL, in locale o nel cloud, come origine dati di input o sink di dati di output per i processi Spark.
Questa libreria contiene il codice sorgente per il connettore Apache Spark per SQL Server e Azure SQL.
Apache Spark è un motore di analisi unificato per l'elaborazione di dati su larga scala.
Sono disponibili due versioni del connettore tramite Maven, una compatibile con 2.4.x e una compatibile con 3.0.x. Entrambe le versioni sono disponibili qui e possono essere importate usando le coordinate seguenti:
Connector | Coordinata Maven |
---|---|
Connettore compatibile con Spark 2.4.x | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Connettore compatibile con Spark 3.0.x | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Connettore compatibile con Spark 3.1.x | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
È anche possibile creare il connettore dall'origine o scaricare il file con estensione jar dalla sezione Release in GitHub. Per informazioni aggiornate sul connettore, vedere il repository GitHub del connettore SQL Spark.
Funzionalità supportate
- Supporto di tutte le associazioni Spark (Scala, Python, R)
- Supporto di autenticazione di base e keytab Active Directory (AD)
- Supporto della scrittura di
dataframe
riordinato - Supporto della scrittura in Istanza singola e Pool di dati di SQL Server in cluster Big Data di SQL Server
- Supporto di Reliable Connector per Istanza singola di SQL Server
Componente | Versioni supportate |
---|---|
Apache Spark | 2.4.x, 3.0.x, 3.1.x |
Scala | 2.11, 2.12 |
Driver Microsoft JDBC per SQL Server | 8.4 |
Microsoft SQL Server | SQL Server 2008 o versione successiva |
Database SQL di Azure | Supportata |
Opzioni supportate
Il connettore Apache Spark per SQL Server e Azure SQL supporta le opzioni definite qui: SQL DataSource JDBC
Sono supportate anche le opzioni seguenti:
Opzione | Default | Descrizione |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT o NO_DUPLICATES . NO_DUPLICATES implementa un inserimento affidabile negli scenari di riavvio dell'executor |
dataPoolDataSource |
none |
none indica che il valore non è impostato e che il connettore deve scrivere in una singola istanza di SQL Server. Impostare questo valore sul nome dell'origine dati per scrivere in una tabella del pool di dati in cluster Big Data |
isolationLevel |
READ_COMMITTED |
Specificare il livello di isolamento |
tableLock |
false |
Implementa un inserimento con l'opzione TABLOCK per migliorare le prestazioni di scrittura |
schemaCheckEnabled |
true |
Disabilita la verifica rigida dello schema della tabella SQL e del frame di dati quando è impostata su false |
Altre opzioni di copia bulk possono essere impostate come opzioni in dataframe
e verranno passate alle API bulkcopy
durante la scrittura
Confronto delle prestazioni
Il connettore Apache Spark per SQL Server e Azure SQL è 15 volte più veloce del connettore JDBC generico per la scrittura in SQL Server. Le prestazioni variano a seconda del tipo, del volume di dati, delle opzioni usate e possono far registrare variazioni da un'esecuzione all'altra. I risultati di prestazioni seguenti corrispondono al tempo impiegato per sovrascrivere una tabella SQL con 143,9 milioni di righe in un dataframe
Spark. Il dataframe
Spark viene costruito leggendo la tabella HDFS store_sales
generata con il benchmark TPCDS Spark. Il tempo per la lettura da store_sales
a dataframe
non è incluso. I risultati corrispondono a una media calcolata su tre esecuzioni.
Tipo connettore | Opzioni | Descrizione | Tempo di scrittura |
---|---|---|---|
JDBCConnector |
Default | Connettore JDBC generico con opzioni predefinite | 1385 secondi |
sql-spark-connector |
BEST_EFFORT |
sql-spark-connector a massimo sforzo con opzioni predefinite |
580 secondi |
sql-spark-connector |
NO_DUPLICATES |
sql-spark-connector affidabile |
709 secondi |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
sql-spark-connector a massimo sforzo con blocco di tabella abilitato |
72 secondi |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
sql-spark-connector affidabile con blocco di tabella abilitato |
198 secondi |
Config
- Configurazione Spark: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
- Configurazione generatore dati: scale_factor=50, partitioned_tables=true
- File di dati
store_sales
con 143.997.590 righe
Ambiente
- Cluster Big Data di SQL Server CU5
master
+ 6 nodi- Ogni nodo con server gen 5, 512 GB di RAM, 4 TB di NVM per nodo, NIC 10 GB
Problemi comuni
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
Questo problema si verifica quando si usa una versione precedente del driver MSSQL (ora incluso in questo connettore) nell'ambiente Hadoop in uso. Se prima di questo momento veniva usato il connettore Azure SQL precedente e sono stati installati manualmente driver nel cluster per la compatibilità con l’autenticazione Microsoft Entra, sarà necessario rimuovere tali driver.
Passaggi per risolvere il problema:
Se si usa un ambiente Hadoop generico, controllare e rimuovere il file MSSQL con estensione jar:
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
. Se si usa Databricks, aggiungere uno script di inizializzazione globale o cluster per rimuovere le versioni precedenti del driver MSSQL dalla cartella/databricks/jars
o aggiungere la riga seguente a uno script esistente:rm /databricks/jars/*mssql*
Aggiungere i pacchetti
adal4j
emssql
. Ad esempio è possibile usare Maven, ma qualsiasi modalità dovrebbe funzionare.Attenzione
Non installare il connettore Spark SQL in questo modo.
Aggiungere la classe driver alla configurazione della connessione. Ad esempio:
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
Per altre informazioni e spiegazioni, vedere la pagina relativa alla risoluzione di https://github.com/microsoft/sql-spark-connector/issues/26.
Operazioni preliminari
Il connettore Apache Spark per SQL Server e Azure SQL si basa sull'API Spark DataSourceV1 e sull'API SQL Server Bulk e usa la stessa interfaccia del connettore Spark SQL JDBC incorporato. Questa integrazione consente di integrare facilmente il connettore ed eseguire la migrazione dei processi Spark esistenti semplicemente aggiornando il parametro format con com.microsoft.sqlserver.jdbc.spark
.
Per includere il connettore nei progetti, scaricare questo repository e compilare il file con estensione jar usando SBT.
Scrivere in una nuova tabella SQL
Avviso
La modalità overwrite
elimina innanzitutto la tabella se esiste già nel database per impostazione predefinita. Usare questa opzione con cautela per evitare perdite di dati impreviste.
Quando si usa la modalità overwrite
, se non si usa l'opzione truncate
quando si ricrea la tabella, gli indici vanno perduti. Una tabella columnstore diventa ora un heap. Se si vuole mantenere l'indicizzazione esistente, specificare anche l'opzione truncate
con valore true. Ad esempio, .option("truncate","true")
.
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Accodare alla tabella SQL
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Specificare il livello di isolamento
Per impostazione predefinita, questo connettore usa il livello di isolamento READ_COMMITTED
quando si esegue l'inserimento bulk nel database. Se si vuole eseguire l'override del livello di isolamento, usare l'opzione mssqlIsolationLevel
come illustrato di seguito.
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
Leggere dalla tabella SQL
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Autenticazione Microsoft Entra
Esempio di Python con entità servizio
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Esempio di Python con password di Active Directory
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Per eseguire l'autenticazione con Active Directory, è necessario installare una dipendenza obbligatoria.
Il formato di user
quando si usa ActiveDirectoryPassword deve essere il formato UPN, ad esempio username@domainname.com
.
Per Scala è necessario installare l'artefatto _com.microsoft.aad.adal4j_
.
Per Python è necessario installare la libreria _adal_
. Questa operazione è disponibile tramite PIP.
Per esempi, vedere Notebook di esempio.
Supporto tecnico
Il connettore Apache Spark per Azure SQL e SQL Server è un progetto open source. Questo connettore non dispone di supporto tecnico Microsoft. Per problemi o domande sul connettore, creare una voce nel repository del progetto. La community del connettore è attiva ed elabora le domande inoltrate.
Passaggi successivi
Visitare il repository GitHub del connettore Spark SQL.
Per informazioni sui livelli di isolamento, vedere SET TRANSACTION ISOLATION LEVEL (Transact-SQL).