Delen via


Apache Spark connector: SQL Server en Azure SQL

De Apache Spark-connector voor SQL Server en Azure SQL is een krachtige connector die u kunt gebruiken om transactionele gegevens te integreren in big data-analyses en resultaten te bewaren voor ad-hoc query's of rapportages. Met behulp van de connector kunt u elke SQL-database, on-premises of in de cloud gebruiken als invoergegevensbron of uitvoergegevenssink voor Spark-taken.

Opmerking

Deze connector wordt niet actief onderhouden. Dit artikel wordt alleen bewaard voor archiveringsdoeleinden.

Deze bibliotheek bevat de broncode voor de Apache Spark Connector voor SQL Server- en Azure SQL-platformen.

Apache Spark is een geïntegreerde analyse-engine voor gegevensverwerking op grote schaal.

Er zijn twee versies van de connector beschikbaar via Maven: een 2.4.x-compatibele versie en een 3.0.x-compatibele versie. Download de connectors uit maven.org en importeer ze met behulp van coördinaten:

Verbinder Maven-coördinaat
Compatibele Spark 2.4.x-connector com.microsoft.azure:spark-mssql-connector:1.0.2
Compatibele Spark 3.0.x-connector com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Compatibele Spark 3.1.x-connector com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

U kunt de connector ook bouwen vanuit de bron of de JAR downloaden uit de sectie Release in GitHub. Zie de GitHub-opslagplaats van de SQL Spark-connector voor de meest recente informatie over de connector.

Ondersteunde functies

  • Ondersteuning voor alle Spark-bindingen (Scala, Python, R)
  • Ondersteuning voor basisverificatie en Active Directory (AD) sleuteltabblad
  • Ondersteuning voor opnieuw ordenen van schrijfbewerkingen dataframe
  • Ondersteuning voor schrijven naar SQL Server Single Instance en Data Pool in SQL Server Big Data Clusters
  • Betrouwbare connectorondersteuning voor Sql Server Single Instance
Onderdeel Ondersteunde versies
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Microsoft JDBC-stuurprogramma voor SQL Server 8.4
Microsoft SQL Server SQL Server 2008 of hoger
Azure SQL Databases Ondersteund

Ondersteunde opties

De Apache Spark-connector voor SQL Server en Azure SQL ondersteunt de opties die zijn gedefinieerd in het artikel SQL DataSource JDBC .

Daarnaast ondersteunt de connector de volgende opties:

Optie Verstek Beschrijving
reliabilityLevel BEST_EFFORT BEST_EFFORT of NO_DUPLICATES. NO_DUPLICATES implementeert een betrouwbare insert in scenario's voor opnieuw opstarten van uitvoerders
dataPoolDataSource none none impliceert dat de waarde niet is ingesteld en dat de connector naar één exemplaar van SQL Server moet schrijven. Stel deze waarde in op de naam van de gegevensbron om een gegevensgroeptabel te schrijven in Big Data-clusters.
isolationLevel READ_COMMITTED Het isolatieniveau opgeven
tableLock false Implementeert een insert met TABLOCK de optie om schrijfprestaties te verbeteren
schemaCheckEnabled true Schakelt strikte gegevensframe- en SQL-tabelschemacontrole uit wanneer deze is ingesteld op onwaar

Stel andere opties voor bulksgewijs kopiëren in als opties op de dataframe. De connector geeft deze opties door aan bulkcopy API's bij schrijven.

Prestatievergelijking

Apache Spark Connector voor SQL Server en Azure SQL is maximaal 15x sneller dan een algemene JDBC-connector voor schrijven naar SQL Server. Prestatiekenmerken verschillen per type, hoeveelheid gegevens, gebruikte opties en kunnen variaties tussen elke uitvoering tonen. De volgende prestatieresultaten zijn de tijd die nodig is om een SQL-tabel te overschrijven met 143,9M-rijen in een spark dataframe. De Spark dataframe wordt opgebouwd door het lezen van de HDFS-tabel store_sales die is gegenereerd met behulp van Spark TPCDS Benchmark. De tijd om te lezen van store_sales tot dataframe is uitgesloten. De resultaten worden gemiddeld berekend over drie runs.

Verbindingslijntype Opties Beschrijving Tijd om te schrijven
JDBCConnector Verstek Algemene JDBC-connector met standaardopties 1385 seconden
sql-spark-connector BEST_EFFORT Beste poging sql-spark-connector met standaardopties 580 seconden
sql-spark-connector NO_DUPLICATES Betrouwbaar sql-spark-connector 709 seconden
sql-spark-connector BEST_EFFORT + tabLock=true Beste poging sql-spark-connector met ingeschakelde tabelvergrendeling 72 seconden
sql-spark-connector NO_DUPLICATES + tabLock=true Betrouwbaar sql-spark-connector met tabelvergrendeling ingeschakeld 198 seconden

Configuratie

  • Spark-configuratie: num_executors = 20, executor_memory = '1664 MB', executor_cores = 2
  • Data Gen-configuratie: scale_factor = 50, partitioned_tables = true
  • Gegevensbestand store_sales met het aantal rijen 143.997.590

Milieu

Veelvoorkomende problemen

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Deze fout treedt op wanneer u een oudere versie van het mssql stuurprogramma in uw Hadoop-omgeving gebruikt. De connector bevat nu dit stuurprogramma. Als u de Azure SQL Connector eerder hebt gebruikt en stuurprogramma's handmatig hebt geïnstalleerd op uw cluster voor compatibiliteit met Microsoft Entra-verificatie, verwijdert u deze stuurprogramma's.

Ga als volgt te werk om de fout op te lossen:

  1. Als u een algemene Hadoop-omgeving gebruikt, controleert en verwijdert u het mssql JAR met de volgende opdracht: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar Als u Databricks gebruikt, voegt u een globaal of cluster-init-script toe om oude versies van het mssql stuurprogramma uit de /databricks/jars map te verwijderen of voegt u deze regel toe aan een bestaand script: rm /databricks/jars/*mssql*

  2. Voeg de adal4j en mssql pakketten toe. U kunt bijvoorbeeld Maven gebruiken, maar elke manier moet werken.

    Waarschuwing

    Installeer de SQL Spark-connector niet op deze manier.

  3. Voeg de stuurprogrammaklasse toe aan uw verbindingsconfiguratie. Voorbeeld:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Voor meer informatie, zie de oplossing voor https://github.com/microsoft/sql-spark-connector/issues/26.

Aan de slag

De Apache Spark-connector voor SQL Server en Azure SQL is gebaseerd op de Spark DataSourceV1-API en de BULK-API van SQL Server. Deze maakt gebruik van dezelfde interface als de ingebouwde JDBC-Spark-SQL-connector. Met deze integratie kunt u de connector eenvoudig integreren en uw bestaande Spark-taken migreren door de indelingsparameter bij te werken met com.microsoft.sqlserver.jdbc.spark.

Als u de connector in uw projecten wilt opnemen, downloadt u deze opslagplaats en bouwt u het JAR met behulp van SBT.

Schrijven naar een nieuwe SQL-tabel

Waarschuwing

In overwrite de modus wordt de tabel eerst verwijderd als deze al bestaat in de database. Gebruik deze optie met zorg om onverwacht gegevensverlies te voorkomen.

Als u de modus overwrite zonder de optie truncate gebruikt bij het opnieuw maken van de tabel, verwijdert de bewerking indexen. Ook wordt een columnstore-tabel gewijzigd in een heap-tabel. Als u bestaande indexen wilt behouden, stelt u de truncate optie in op true. Bijvoorbeeld: .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Toevoegen aan SQL-tabel

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Het isolatieniveau opgeven

Deze connector maakt standaard gebruik van het READ_COMMITTED isolatieniveau wanneer er bulksgewijs gegevens in de database worden ingevoegd. Als u het isolatieniveau wilt overschrijven, gebruikt u de mssqlIsolationLevel volgende optie:

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Lezen uit SQL-tabel

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra-authenticatie

Python-voorbeeld met service-principal

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Python-voorbeeld met Active Directory-wachtwoord

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Installeer de vereiste afhankelijkheid om te authenticeren met behulp van Active Directory.

Wanneer u ActiveDirectoryPassword gebruikt, moet de waarde van user in de UPN-indeling zijn, zoals username@domainname.com.

Installeer het artefact voor com.microsoft.aad.adal4j.

Installeer de bibliotheek voor adal. Deze bibliotheek is beschikbaar via pip.

Zie de voorbeeldnotitieblokken voor voorbeelden.