Apache Spark-Connector: SQL Server & Azure SQL
Der Apache Spark-Connector für SQL Server und Azure SQL ist ein Hochleistungsconnector, der es Ihnen ermöglicht, Transaktionsdaten in Big Data-Analysen zu nutzen und Ergebnisse für Ad-hoc-Abfragen oder Berichte zu speichern. Der Connector ermöglicht Ihnen die Verwendung einer beliebigen SQL-Datenbank (lokal oder in der Cloud) als Eingabedatenquelle oder als Ausgabedatensenke für Spark-Aufträge.
Diese Bibliothek enthält den Quellcode für den Apache Spark-Connector für SQL Server und Azure SQL.
Apache Spark ist eine vereinheitlichte Engine zur Verarbeitung von umfangreichen Daten.
Über Maven sind zwei Versionen des Connectors verfügbar: eine mit 2.4.x kompatible Version und eine mit 3.0.x kompatible Version. Beide Versionen sind hier erhältlich und können mit den folgenden Koordinaten importiert werden:
Connector | Maven-Koordinate |
---|---|
Mit Spark 2.4.x kompatibler Connector | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Mit Spark 3.0.x kompatibler Connector | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Mit Spark 3.1.x kompatibler Connector | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
Ferner können Sie den Connector unter Verwendung einer Quelle erstellen oder die JAR-Datei aus dem Releaseabschnitt in GitHub herunterladen. Aktuelle Informationen zum Connector finden Sie unter dem SQL Spark-Connector im GitHub-Repository.
Unterstützte Funktionen
- Unterstützung für alle Spark-Bindungen (Scala, Python, R)
- Unterstützung der Standardauthentifizierung und der Active Directory (AD)-Schlüsseltabelle
- Geänderte Unterstützung für
dataframe
-Schreibvorgänge - Unterstützung von Schreibvorgängen in SQL Server-Einzelinstanzen und Datenpools in Big Data-Clustern von SQL Server
- Zuverlässige Connectorunterstützung für SQL Server-Einzelinstanzen
Komponente | Unterstützte Versionen |
---|---|
Apache Spark | 2.4.x, 3.0.x, 3.1.x |
Scala | 2.11, 2.12 |
Microsoft JDBC-Treiber für SQL Server | 8,4 |
Microsoft SQL Server | SQL Server 2008 oder höher |
Azure SQL-Datenbank-Instanzen | Unterstützt |
Unterstützte Optionen
Der Apache Spark-Connector für SQL Server und Azure SQL unterstützt die hier definierten Optionen: SQL DataSource JDBC
Darüber hinaus werden die folgenden Optionen unterstützt
Option | Standard | BESCHREIBUNG |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT oder NO_DUPLICATES . NO_DUPLICATES implementiert eine zuverlässige Einfügung in Neustartszenarios von Executor. |
dataPoolDataSource |
none |
none impliziert, dass der Wert nicht festgelegt ist und dass der Connector in eine SQL Server-Einzelinstanz schreiben soll. Legen Sie diesen Wert auf einen Datenquellennamen fest, um eine Datenpooltabelle in Big Data-Clustern zu schreiben. |
isolationLevel |
READ_COMMITTED |
Angeben der Isolationsstufe |
tableLock |
false |
Implementiert eine Einfügung mit der TABLOCK -Option, um die Schreibleistung zu verbessern. |
schemaCheckEnabled |
true |
Deaktiviert die strikte Schemaüberprüfung für Datenrahmen und SQL-Tabellen, wenn false festgelegt ist. |
Andere Optionen zum Massenkopieren können für dataframe
als Optionen festgelegt werden und werden beim Schreiben an die bulkcopy
-APIs übergeben.
Leistungsvergleich
Der Apache Spark-Connector für SQL Server und Azure SQL ist beim Schreiben in SQL Server bis zu 15 Mal schneller als der generische JDBC-Connector. Die Leistungsmerkmale variieren abhängig vom Typ, der Datenmenge und den verwendeten Optionen und können von Ausführung zu Ausführung variieren. Die folgenden Leistungsergebnisse entsprechen der Zeit, die zum Überschreiben einer SQL-Tabelle mit 143,9 Mio. Zeilen in einem Spark-dataframe
benötigt wurde. Der Spark-dataframe
wird erstellt, indem die mit dem Spark-TPCDS-Vergleichstest generierte store_sales
-HDFS-Tabelle gelesen wird. Die Zeit zum Lesen von store_sales
in dataframe
wird ausgeschlossen. Die Ergebnisse werden über drei Ausführungen gemittelt.
Connectortyp | Optionen | BESCHREIBUNG | Schreibdauer |
---|---|---|---|
JDBCConnector |
Standard | Generischer JDBC-Connector mit Standardoptionen | 1\.385 Sekunden |
sql-spark-connector |
BEST_EFFORT |
Bestmögliche sql-spark-connector -Leistung mit Standardoptionen |
580 Sekunden |
sql-spark-connector |
NO_DUPLICATES |
Zuverlässige sql-spark-connector -Leistung |
709 Sekunden |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
Bestmögliche sql-spark-connector -Leistung mit aktivierter Tabellensperre |
72 Sekunden |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
Zuverlässige sql-spark-connector -Leistung mit aktivierter Tabellensperre |
198 Sekunden |
Konfigurationen
- Spark config: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
- Data Gen config: scale_factor=50, partitioned_tables=true
- Datendatei
store_sales
mit 143.997.590 Zeilen
Umgebung
- Big Data-Cluster von SQL Server CU5
master
+ 6 Knoten- Jeder Knoten auf einem Gen 5-Server verfügt über 512 GB RAM, 4 TB NVM pro Knoten und eine 10 GB-NIC
Häufige Probleme
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
Dieses Problem tritt bei Verwendung einer älteren Version des MSSQL-Treibers (der jetzt in diesem Connector enthalten ist) in einer Hadoop-Umgebung auf. Wenn Sie bisher den vorherigen Azure SQL-Connector verwendet und in diesem Cluster Treiber manuell installiert haben, um Kompatibilität mit der Microsoft Entra-Authentifizierung zu gewährleisten, müssen Sie diese Treiber entfernen.
Schritte zum Beheben des Problems:
Wenn Sie eine generische Hadoop-Umgebung verwenden, überprüfen Sie die MSSQL-JAR-Datei
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
, und entfernen Sie diese. Wenn Sie Databricks verwenden, fügen Sie ein globales oder Clusterinitialisierungsskript hinzu, um alte Versionen des MSSQL-Treibers aus dem Ordner/databricks/jars
zu entfernen, oder fügen Sie die folgende Zeile in ein vorhandenes Skript ein:rm /databricks/jars/*mssql*
Fügen Sie die Pakete
adal4j
undmssql
hinzu. Sie können z. B. Maven verwenden, allerdings sollten auch andere Tools funktionieren.Achtung
Der SQL Spark-Connector sollte nicht auf diese Weise installiert werden.
Fügen Sie der Verbindungskonfiguration die Treiberklasse hinzu. Beispiel:
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
Weitere Informationen und Erläuterungen finden Sie im Abschnitt mit der Lösung zu https://github.com/microsoft/sql-spark-connector/issues/26.
Erste Schritte
Der Apache Spark-Connector für SQL Server und Azure SQL basiert auf der Spark DataSourceV1-API und der SQL Server-Bulk-API und verwendet dieselbe Schnittstelle wie der integrierte JDBC-Spark-SQL-Connector. Mit dieser Integration können Sie den Connector problemlos integrieren und vorhandene Spark-Aufträge migrieren, indem Sie den format-Parameter einfach mit com.microsoft.sqlserver.jdbc.spark
aktualisieren.
Um den Connector in Ihre Projekte einzubinden, laden Sie dieses Repository herunter, und erstellen Sie die JAR-Datei mithilfe von SBT.
Schreiben in eine neue SQL-Tabelle
Warnung
Im overwrite
-Modus wird standardmäßig zuerst die Tabelle gelöscht, wenn sie bereits in der Datenbank vorhanden ist. Verwenden Sie diese Option mit der gebotenen Sorgfalt, um unerwartete Datenverluste zu vermeiden.
Wenn Sie bei Verwendung des overwrite
-Modus nicht die Option truncate
verwenden, gehen bei der erneuten Erstellung der Tabelle Indizes verloren. So wäre eine columnstore-Tabelle nun ein Heap. Wenn Sie die vorhandene Indizierung beibehalten möchten, müssen Sie auch die Option truncate
mit dem Wert „true“ angeben. Beispiel: .option("truncate","true")
.
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Anfügen an eine SQL-Tabelle
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Angeben der Isolationsstufe
Beim Durchführen der Masseneinfügung in die Datenbank verwendet dieser Connector standardmäßig die Isolationsstufe READ_COMMITTED
. Wenn Sie die Isolationsstufe außer Kraft setzen möchten, verwenden Sie die Option mssqlIsolationLevel
, wie unten gezeigt.
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
Lesen aus einer SQL-Tabelle
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Microsoft Entra-Authentifizierung
Python-Beispiel mit Dienstprinzipal
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Python-Beispiel mit Active Directory-Kennwort
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Eine erforderliche Abhängigkeit muss installiert werden, um die Authentifizierung mit Active Directory zu unterstützen.
user
sollte bei Verwendung von ActiveDirectoryPassword das UPN-Format aufweisen. Beispiel: username@domainname.com
.
Für Scala muss das _com.microsoft.aad.adal4j_
-Artefakt installiert werden.
Für Python muss die _adal_
-Bibliothek installiert werden. Diese ist über pip verfügbar.
In den Beispielnotebooks finden Sie Beispiele.
Support
Der Apache Spark-Connector für Azure SQL und SQL Server ist ein Open-Source-Projekt. Dieser Connector wird nicht durch den Microsoft-Support unterstützt. Wenn Sie Probleme oder Fragen zum Connector haben, erstellen Sie einen Issue in diesem Projektrepository. Die Connectorcommunity ist aktiv und überwacht eingereichte Issues.
Nächste Schritte
Besuchen Sie das GitHub-Repository für den SQL Spark-Connector.
Informationen zu Isolationsstufen in SQL Server finden Sie unter SET TRANSACTION ISOLATION LEVEL (Transact-SQL).