Dela via


Apache Spark-anslutningsprogram: SQL Server och Azure SQL

Apache Spark-anslutningsappen för SQL Server och Azure SQL är en anslutningsapp med höga prestanda som gör att du kan använda transaktionsdata i stordataanalys och spara resultat för ad hoc-frågor eller rapportering. Med anslutningsappen kan du använda valfri SQL-databas, lokalt eller i molnet, som indatakälla eller utdatamottagare för Spark-jobb.

Anmärkning

Den här anslutningsappen underhålls inte aktivt. Den här artikeln behålls endast i arkiveringssyfte.

Det här biblioteket innehåller källkoden för Apache Spark Connector för SQL Server- och Azure SQL-plattformar.

Apache Spark är en enhetlig analysmotor för storskalig databearbetning.

Det finns två versioner av anslutningsappen som är tillgängliga via Maven, en 2.4.x-kompatibel version och en 3.0.x-kompatibel version. Ladda ned anslutningsapparna från maven.org och importera dem med hjälp av koordinater:

Anslutning Maven-koordinat
Spark 2.4.x-kompatibel anslutning com.microsoft.azure:spark-mssql-connector:1.0.2
Spark 3.0.x-kompatibel anslutning com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Spark 3.1.x-kompatibel anslutning com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Du kan också skapa anslutningsappen från källan eller ladda ned jar-filen från avsnittet Release i GitHub. Den senaste informationen om anslutningsappen finns i GitHub-lagringsplatsen för SQL Spark-anslutningsappen.

Funktioner som stöds

  • Stöd för alla Spark-bindningar (Scala, Python, R)
  • Stöd för grundläggande autentisering och Active Directory-nyckelflik (AD)
  • Omstrukturerat dataframe skrivstöd
  • Stöd för skrivning till en enskild SQL Server-instans och datapool i SQL Server-stordatakluster
  • Tillförlitligt stöd för kontakt för SQL Server Single Instance
Komponent Versioner som stöds
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Microsoft JDBC-drivrutin för SQL Server 8,4
Microsoft SQL Server SQL Server 2008 eller senare
Azure SQL Databases Understödd

Alternativ som stöds

Apache Spark Connector för SQL Server och Azure SQL stöder de alternativ som definieras här: SQL DataSource JDBC

Dessutom stöds följande alternativ:

Alternativ Förinställning Beskrivning
reliabilityLevel BEST_EFFORT BEST_EFFORT eller NO_DUPLICATES. NO_DUPLICATES implementerar en tillförlitlig infogning i utförarnystartsscenarier
dataPoolDataSource none none antyder att värdet inte har angetts och att anslutningsappen ska skriva till en enskild SQL Server-instans. Ange det här värdet till datakällans namn för att skriva en datapooltabell i stordatakluster
isolationLevel READ_COMMITTED Ange isoleringsnivå
tableLock false Implementerar en infogning med TABLOCK alternativ för att förbättra skrivprestanda
schemaCheckEnabled true Inaktiverar strikt dataram och sql-tabellschemakontroll när värdet är falskt

Andra alternativ för masskopiering kan anges som alternativ för dataframe och skickas till bulkcopy API:er vid skrivning

Prestandajämförelse

Apache Spark Connector för SQL Server och Azure SQL är upp till 15 gånger snabbare än den allmänna JDBC-anslutningsappen för skrivning till SQL Server. Prestanda varierar beroende på typ, volym av data, använda alternativ och kan visa variationer från körning till körning. Följande prestandaresultat är den tid det tar att skriva över en SQL-tabell med 143,9 miljoner rader i en spark dataframe. Spark dataframe skapas genom att läsa store_sales HDFS-tabellen som genereras med spark TPCDS Benchmark. Den tid det tar att läsa store_sales till dataframe är undantagen. Resultaten beräknas som ett genomsnitt över tre körningar.

Anslutningstyp Alternativ Beskrivning Tid att skriva
JDBCConnector Förinställning Allmän JDBC-anslutning med standardalternativ 1 385 sekunder
sql-spark-connector BEST_EFFORT Bästa möjliga arbete sql-spark-connector med standardalternativ 580 sekunder
sql-spark-connector NO_DUPLICATES Pålitlig sql-spark-connector 709 sekunder
sql-spark-connector BEST_EFFORT + tabLock=true Bästa möjliga arbete sql-spark-connector med tabelllås aktiverat 72 sekunder
sql-spark-connector NO_DUPLICATES + tabLock=true Tillförlitlig sql-spark-connector med tabelllås aktiverat 198 sekunder

Konfiguration

  • Spark-konfiguration: num_executors = 20, executor_memory = "1664 m", executor_cores = 2
  • Konfiguration av Data Gen: scale_factor=50, partitioned_tables=true
  • Datafil store_sales med nr av rader 143 997 590

Miljö

Vanliga problem

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Det här problemet uppstår när du använder en äldre version av mssql-drivrutinen (som nu ingår i den här anslutningsappen) i din hadoop-miljö. Om du kommer från att använda den tidigare Azure SQL Connector och har manuellt installerade drivrutiner på klustret för Microsoft Entra-autentiseringskompatibilitet måste du ta bort dessa drivrutiner.

Steg för att åtgärda problemet:

  1. Om du använder en generisk Hadoop-miljö, kontrollera och ta bort mssql jar: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Om du använder Databricks lägger du till ett globalt init-skript eller klusterinitskript för att ta bort gamla versioner av mssql-drivrutinen från /databricks/jars mappen eller lägga till den här raden i ett befintligt skript: rm /databricks/jars/*mssql*

  2. Lägg till paketen adal4j och mssql . Du kan till exempel använda Maven, men på vilket sätt som helst bör det fungera.

    Försiktighet

    Installera inte SQL Spark-anslutningsappen på det här sättet.

  3. Lägg till drivrutinsklassen i anslutningskonfigurationen. Till exempel:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Mer information och förklaring finns i lösningen till https://github.com/microsoft/sql-spark-connector/issues/26.

Kom igång

Apache Spark Connector för SQL Server och Azure SQL baseras på Spark DataSourceV1 API och SQL Server Bulk API och använder samma gränssnitt som den inbyggda JDBC-Spark-SQL-anslutningsappen. Med den här integreringen kan du enkelt integrera anslutningsappen och migrera dina befintliga Spark-jobb genom att helt enkelt uppdatera formatparametern med com.microsoft.sqlserver.jdbc.spark.

Om du vill inkludera anslutningsappen i dina projekt laddar du ned den här lagringsplatsen och skapar jar-filen med hjälp av SBT.

Skriva till en ny SQL-tabell

Varning

Läget overwrite släpper först tabellen om den redan finns i databasen som standard. Använd det här alternativet med försiktighet för att undvika oväntade dataförluster.

När du använder läget overwrite om du inte använder alternativet truncate för att återskapa tabellen går index förlorade. skulle en columnstore-tabell nu vara en hög. Om du vill behålla befintlig indexering anger du även alternativet truncate med värdet true. Till exempel .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Lägg till i SQL-tabell

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Ange isoleringsnivå

Den här kontakten använder READ_COMMITTED som standardnivå för isolering vid massinföring i databasen. Om du vill åsidosätta isoleringsnivån använder du alternativet mssqlIsolationLevel :

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Läsa från SQL-tabell

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra-autentisering

Python-exempel med tjänstens huvudnamn

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Python-exempel med Active Directory-lösenord

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Ett obligatoriskt beroende måste installeras för att kunna autentisera med Hjälp av Active Directory.

Formatet user för när du använder ActiveDirectoryPassword ska vara UPN-formatet, till exempel username@domainname.com.

För Scala_com.microsoft.aad.adal4j_ måste artefakten installeras.

För Python_adal_ måste biblioteket installeras. Detta är tillgängligt via pip.

Kolla exempelanteckningsböckerna för att se exempel.

Besök GitHub-lagringsplatsen för SQL Spark-anslutningsappen.