Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Konektor Apache Spark pro SQL Server a Azure SQL je vysoce výkonný konektor, který můžete použít k zahrnutí transakčních dat do analýz velkých objemů dat a zachování výsledků pro ad hoc dotazy nebo vytváření sestav. Pomocí konektoru můžete použít libovolnou databázi SQL, místní nebo cloudovou, jako vstupní zdroj dat nebo výstupní datovou jímku pro úlohy Sparku.
Poznámka:
Tento konektor se aktivně neudržuje. Tento článek se uchovává pouze pro účely archivace.
Tato knihovna obsahuje zdrojový kód konektoru Apache Spark pro PLATFORMY SQL Server a Azure SQL.
Apache Spark je jednotný analytický modul pro zpracování velkých objemů dat.
Prostřednictvím Mavenu jsou k dispozici dvě verze konektoru: kompatibilní verze 2.4.x a kompatibilní verze 3.0.x. Stáhněte konektory z maven.org a importujte je pomocí souřadnic:
| Konektor | Souřadnice Mavenu |
|---|---|
| Konektor kompatibilní se Sparkem 2.4.x | com.microsoft.azure:spark-mssql-connector:1.0.2 |
| Konektor kompatibilní se Sparkem 3.0.x | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
| Konektor kompatibilní se Sparkem 3.1.x | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
Konektor můžete také sestavit ze zdroje nebo stáhnout soubor JAR z části Vydané verze na GitHubu. Nejnovější informace o konektoru najdete v úložišti GitHubu konektoru SQL Spark.
Podporované funkce
- Podpora všech vazeb Sparku (Scala, Python, R)
- Podpora základního ověřování a karty kláves Active Directory (AD)
- Přeuspořádaná podpora zápisu
dataframe - Podpora zapisování do jednotlivé instance SQL Serveru a datového fondu v Clusterech velkých objemů dat SQL Serveru
- Podpora spolehlivého konektoru pro jednu instanci SQL Serveru
| Součást | Podporované verze |
|---|---|
| Apache Spark | 2.4.x, 3.0.x, 3.1.x |
| Scala | 2.11, 2.12 |
| Ovladač Microsoft JDBC pro SQL Server | 8.4 |
| Microsoft SQL Server | SQL Server 2008 nebo novější |
| Azure SQL Databases | Podporováno |
Podporované možnosti
Konektor Apache Spark pro SQL Server a Azure SQL podporuje možnosti definované v článku SQL DataSource JDBC .
Konektor navíc podporuje následující možnosti:
| Možnost | Výchozí | Popis |
|---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT nebo NO_DUPLICATES.
NO_DUPLICATES implementuje spolehlivé vložení ve scénářích restartování exekutoru. |
dataPoolDataSource |
none |
none znamená, že hodnota není nastavená a konektor by měl zapisovat do jedné instance SQL Serveru. Nastavte tuto hodnotu na název zdroje dat pro zápis tabulky fondu dat v clusterech s velkými objemy dat. |
isolationLevel |
READ_COMMITTED |
Určení úrovně izolace |
tableLock |
false |
Implementuje vložení s TABLOCK možností zlepšení výkonu zápisu. |
schemaCheckEnabled |
true |
Zakáže striktní datový rámec a kontrolu schématu tabulky SQL, pokud je nastavená hodnota false. |
Nastavte další možnosti hromadného kopírování jako možnosti v dataframe. Konektor tyto možnosti bulkcopy předá rozhraním API při zápisu.
Porovnání výkonu
Konektor Apache Spark pro SQL Server a Azure SQL je až 15krát rychlejší než obecný konektor JDBC pro zápis do SQL Serveru. Charakteristiky výkonu se liší podle typu, objemu dat, použitých možností a můžou zobrazovat varianty mezi jednotlivými spuštěními. Následující výsledky výkonu jsou čas potřebný k přepsání tabulky SQL s 143,9M řádky ve Sparku dataframe.
dataframe Spark je vytvořen čtením store_sales tabulky HDFS generované pomocí srovnávacího testu Spark TPCDS. Doba čtení od store_sales do dataframe je vyloučena. Výsledky se průměrují z třech běhů.
| Typ spojnice | Možnosti | Popis | Čas na zápis |
|---|---|---|---|
JDBCConnector |
Výchozí | Obecný konektor JDBC s výchozími možnostmi | 1 385 sekund |
sql-spark-connector |
BEST_EFFORT |
Nejlepší úsilí sql-spark-connector s výchozími možnostmi |
580 sekund |
sql-spark-connector |
NO_DUPLICATES |
Spolehlivý sql-spark-connector |
709 sekund |
sql-spark-connector |
BEST_EFFORT +tabLock=true |
Nejlepší úsilí sql-spark-connector s povoleným zámkem tabulky |
72 sekund |
sql-spark-connector |
NO_DUPLICATES +tabLock=true |
Spolehlivý sql-spark-connector s povoleným zamykáním tabulek |
198 sekund |
Nastavení
- Konfigurace Sparku: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
- Konfigurace Data Gen: měřítko=50, rozdělené_tabule=true
- Datový soubor
store_saless počtem řádků 143 997 590
Životní prostředí
- Cluster s velkými objemy dat SQL Serveru CU5
-
master+ 6 uzlů - Každý uzel serveru Gen-5 s 512 GB RAM, 4 TB NVM na uzel a 10 Gbps NIC
Běžné problémy
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
K této chybě dochází při použití starší verze mssql ovladače v prostředí Hadoop. Konektor teď obsahuje tento ovladač. Pokud jste dříve použili konektor Azure SQL a ručně nainstalovali ovladače v clusteru pro kompatibilitu ověřování Microsoft Entra, odeberte tyto ovladače.
Oprava chyby:
Pokud používáte obecné prostředí Hadoop, pomocí následujícího příkazu zkontrolujte a odeberte
mssqlsoubor JAR:rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jarPokud používáte Databricks, přidejte globální nebo clusterový inicializační skript pro odebrání starýchmssqlverzí ovladače ze/databricks/jarssložky nebo přidejte tento řádek do existujícího skriptu:rm /databricks/jars/*mssql*Přidejte balíčky
adal4jamssql. Můžete například použít Maven, ale jakýkoliv způsob by měl fungovat.Upozornění
Tímto způsobem neinstalujte konektor SQL Spark.
Přidejte třídu ovladače do konfigurace připojení. Například:
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
Pro více informací, podívejte se na řešení https://github.com/microsoft/sql-spark-connector/issues/26.
Začínáme
Konektor Apache Spark pro SQL Server a Azure SQL je založený na rozhraní API Spark DataSourceV1 a rozhraní SQL Server Bulk API. Používá stejné rozhraní jako integrovaný konektor JDBC Spark-SQL. Pomocí této integrace můžete snadno integrovat konektor a migrovat stávající úlohy Sparku aktualizací parametru formátu .com.microsoft.sqlserver.jdbc.spark
Pokud chcete do svých projektů zahrnout konektor, stáhněte si toto úložiště a sestavte soubor JAR pomocí SBT.
Zápis do nové tabulky SQL
Výstraha
Režim overwrite nejprve tabulku zahodí, pokud už v databázi existuje. Tuto možnost používejte opatrně, abyste se vyhnuli neočekávané ztrátě dat.
Pokud použijete režim overwrite bez možnosti truncate při opětovném vytvoření tabulky, operace odebere indexy. Tabulka columnstore se také změní na tabulku haldy. Pokud chcete zachovat existující indexy, nastavte truncate možnost truena hodnotu . Například: .option("truncate","true").
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Připojení k tabulce SQL
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Určení úrovně izolace
Tento konektor ve výchozím nastavení používá READ_COMMITTED úroveň izolace při hromadném vkládání dat do databáze. Pokud chcete přepsat úroveň izolace, použijte možnost mssqlIsolationLevel:
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
Čtení z tabulky SQL
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Ověřovací systém Microsoft Entra
Příklad Pythonu s instančním objektem
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Příklad Pythonu s heslem služby Active Directory
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Pokud chcete provést ověření pomocí služby Active Directory, nainstalujte požadovanou závislost.
Při použití ActiveDirectoryPassword by měla být hodnota ve formátu UPN, například user.
Pro Scala nainstalujte balíček com.microsoft.aad.adal4j.
Pro Python nainstalujte knihovnu adal . Tato knihovna je k dispozici prostřednictvím nástroje pip.
Příklady najdete v ukázkových poznámkových blocích.