Sdílet prostřednictvím


Konektor pro Apache Spark: SQL Server a Azure SQL

Konektor Apache Spark pro SQL Server a Azure SQL je vysoce výkonný konektor, který můžete použít k zahrnutí transakčních dat do analýz velkých objemů dat a zachování výsledků pro ad hoc dotazy nebo vytváření sestav. Pomocí konektoru můžete použít libovolnou databázi SQL, místní nebo cloudovou, jako vstupní zdroj dat nebo výstupní datovou jímku pro úlohy Sparku.

Poznámka:

Tento konektor se aktivně neudržuje. Tento článek se uchovává pouze pro účely archivace.

Tato knihovna obsahuje zdrojový kód konektoru Apache Spark pro PLATFORMY SQL Server a Azure SQL.

Apache Spark je jednotný analytický modul pro zpracování velkých objemů dat.

Prostřednictvím Mavenu jsou k dispozici dvě verze konektoru: kompatibilní verze 2.4.x a kompatibilní verze 3.0.x. Stáhněte konektory z maven.org a importujte je pomocí souřadnic:

Konektor Souřadnice Mavenu
Konektor kompatibilní se Sparkem 2.4.x com.microsoft.azure:spark-mssql-connector:1.0.2
Konektor kompatibilní se Sparkem 3.0.x com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Konektor kompatibilní se Sparkem 3.1.x com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Konektor můžete také sestavit ze zdroje nebo stáhnout soubor JAR z části Vydané verze na GitHubu. Nejnovější informace o konektoru najdete v úložišti GitHubu konektoru SQL Spark.

Podporované funkce

  • Podpora všech vazeb Sparku (Scala, Python, R)
  • Podpora základního ověřování a karty kláves Active Directory (AD)
  • Přeuspořádaná podpora zápisu dataframe
  • Podpora zapisování do jednotlivé instance SQL Serveru a datového fondu v Clusterech velkých objemů dat SQL Serveru
  • Podpora spolehlivého konektoru pro jednu instanci SQL Serveru
Součást Podporované verze
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Ovladač Microsoft JDBC pro SQL Server 8.4
Microsoft SQL Server SQL Server 2008 nebo novější
Azure SQL Databases Podporováno

Podporované možnosti

Konektor Apache Spark pro SQL Server a Azure SQL podporuje možnosti definované v článku SQL DataSource JDBC .

Konektor navíc podporuje následující možnosti:

Možnost Výchozí Popis
reliabilityLevel BEST_EFFORT BEST_EFFORT nebo NO_DUPLICATES. NO_DUPLICATES implementuje spolehlivé vložení ve scénářích restartování exekutoru.
dataPoolDataSource none none znamená, že hodnota není nastavená a konektor by měl zapisovat do jedné instance SQL Serveru. Nastavte tuto hodnotu na název zdroje dat pro zápis tabulky fondu dat v clusterech s velkými objemy dat.
isolationLevel READ_COMMITTED Určení úrovně izolace
tableLock false Implementuje vložení s TABLOCK možností zlepšení výkonu zápisu.
schemaCheckEnabled true Zakáže striktní datový rámec a kontrolu schématu tabulky SQL, pokud je nastavená hodnota false.

Nastavte další možnosti hromadného kopírování jako možnosti v dataframe. Konektor tyto možnosti bulkcopy předá rozhraním API při zápisu.

Porovnání výkonu

Konektor Apache Spark pro SQL Server a Azure SQL je až 15krát rychlejší než obecný konektor JDBC pro zápis do SQL Serveru. Charakteristiky výkonu se liší podle typu, objemu dat, použitých možností a můžou zobrazovat varianty mezi jednotlivými spuštěními. Následující výsledky výkonu jsou čas potřebný k přepsání tabulky SQL s 143,9M řádky ve Sparku dataframe. dataframe Spark je vytvořen čtením store_sales tabulky HDFS generované pomocí srovnávacího testu Spark TPCDS. Doba čtení od store_sales do dataframe je vyloučena. Výsledky se průměrují z třech běhů.

Typ spojnice Možnosti Popis Čas na zápis
JDBCConnector Výchozí Obecný konektor JDBC s výchozími možnostmi 1 385 sekund
sql-spark-connector BEST_EFFORT Nejlepší úsilí sql-spark-connector s výchozími možnostmi 580 sekund
sql-spark-connector NO_DUPLICATES Spolehlivý sql-spark-connector 709 sekund
sql-spark-connector BEST_EFFORT +tabLock=true Nejlepší úsilí sql-spark-connector s povoleným zámkem tabulky 72 sekund
sql-spark-connector NO_DUPLICATES +tabLock=true Spolehlivý sql-spark-connector s povoleným zamykáním tabulek 198 sekund

Nastavení

  • Konfigurace Sparku: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
  • Konfigurace Data Gen: měřítko=50, rozdělené_tabule=true
  • Datový soubor store_sales s počtem řádků 143 997 590

Životní prostředí

Běžné problémy

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

K této chybě dochází při použití starší verze mssql ovladače v prostředí Hadoop. Konektor teď obsahuje tento ovladač. Pokud jste dříve použili konektor Azure SQL a ručně nainstalovali ovladače v clusteru pro kompatibilitu ověřování Microsoft Entra, odeberte tyto ovladače.

Oprava chyby:

  1. Pokud používáte obecné prostředí Hadoop, pomocí následujícího příkazu zkontrolujte a odeberte mssql soubor JAR: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar Pokud používáte Databricks, přidejte globální nebo clusterový inicializační skript pro odebrání starých mssql verzí ovladače ze /databricks/jars složky nebo přidejte tento řádek do existujícího skriptu: rm /databricks/jars/*mssql*

  2. Přidejte balíčky adal4j a mssql. Můžete například použít Maven, ale jakýkoliv způsob by měl fungovat.

    Upozornění

    Tímto způsobem neinstalujte konektor SQL Spark.

  3. Přidejte třídu ovladače do konfigurace připojení. Například:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Pro více informací, podívejte se na řešení https://github.com/microsoft/sql-spark-connector/issues/26.

Začínáme

Konektor Apache Spark pro SQL Server a Azure SQL je založený na rozhraní API Spark DataSourceV1 a rozhraní SQL Server Bulk API. Používá stejné rozhraní jako integrovaný konektor JDBC Spark-SQL. Pomocí této integrace můžete snadno integrovat konektor a migrovat stávající úlohy Sparku aktualizací parametru formátu .com.microsoft.sqlserver.jdbc.spark

Pokud chcete do svých projektů zahrnout konektor, stáhněte si toto úložiště a sestavte soubor JAR pomocí SBT.

Zápis do nové tabulky SQL

Výstraha

Režim overwrite nejprve tabulku zahodí, pokud už v databázi existuje. Tuto možnost používejte opatrně, abyste se vyhnuli neočekávané ztrátě dat.

Pokud použijete režim overwrite bez možnosti truncate při opětovném vytvoření tabulky, operace odebere indexy. Tabulka columnstore se také změní na tabulku haldy. Pokud chcete zachovat existující indexy, nastavte truncate možnost truena hodnotu . Například: .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Připojení k tabulce SQL

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Určení úrovně izolace

Tento konektor ve výchozím nastavení používá READ_COMMITTED úroveň izolace při hromadném vkládání dat do databáze. Pokud chcete přepsat úroveň izolace, použijte možnost mssqlIsolationLevel:

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Čtení z tabulky SQL

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Ověřovací systém Microsoft Entra

Příklad Pythonu s instančním objektem

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Příklad Pythonu s heslem služby Active Directory

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Pokud chcete provést ověření pomocí služby Active Directory, nainstalujte požadovanou závislost.

Při použití ActiveDirectoryPassword by měla být hodnota ve formátu UPN, například user.

Pro Scala nainstalujte balíček com.microsoft.aad.adal4j.

Pro Python nainstalujte knihovnu adal . Tato knihovna je k dispozici prostřednictvím nástroje pip.

Příklady najdete v ukázkových poznámkových blocích.