Megosztás a következőn keresztül:


Apache Spark-összekötő: SQL Server és Azure SQL

Az SQL Serverhez és az Azure SQL-hez készült Apache Spark-összekötő egy nagy teljesítményű összekötő, amellyel tranzakciós adatokat vehet fel a big data-elemzésekbe, és megőrizheti az alkalmi lekérdezések vagy jelentések eredményeit. Az összekötő használatával bármilyen SQL-adatbázist használhat a helyszínen vagy a felhőben, a Spark-feladatok bemeneti adatforrásaként vagy kimeneti adatgyűjtőjeként.

Megjegyzés:

Ez az összekötő nincs aktívan karbantartva. Ez a cikk csak archiválási célból marad meg.

Ez a kódtár az Sql Serverhez és az Azure SQL-platformokhoz készült Apache Spark Connector forráskódját tartalmazza.

Az Apache Spark egy egységes elemzési motor a nagy léptékű adatfeldolgozáshoz.

Az összekötő két verziója érhető el a Mavenen keresztül: egy 2.4.x-kompatibilis és egy 3.0.x-kompatibilis verzióval. Töltse le az összekötőket maven.org és importálja őket koordinátákkal:

Csatlakozó Maven koordináta
Spark 2.4.x kompatibilis összekötő com.microsoft.azure:spark-mssql-connector:1.0.2
Spark 3.0.x kompatibilis összekötő com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Spark 3.1.x kompatibilis összekötő com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Az összekötőt forrásból is létrehozhatja, vagy letöltheti a JAR-t a GitHub Kiadás szakaszából. Az összekötőről további információt az SQL Spark-összekötő GitHub-adattárában talál.

Támogatott funkciók

  • Az összes Spark-kötés (Scala, Python, R) támogatása
  • Alapszintű hitelesítés és Active Directory (AD) billentyűlap támogatása
  • Újrarendezett írástámogatás
  • Az önálló SQL Server-példányba és Adatkészletbe való írás támogatása az SQL Server Big Data-fürtökben
  • Megbízható összekötők támogatása önálló SQL Server-példányhoz
Összetevő Támogatott verziók
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Microsoft JDBC-illesztőprogram SQL Serverhez 8.4
Microsoft SQL Server SQL Server 2008 vagy újabb
Azure SQL Database-adatbázisok Támogatott

Támogatott beállítások

Az SQL Serverhez és az Azure SQL-hez készült Apache Spark Connector támogatja az SQL DataSource JDBC-cikkben meghatározott beállításokat.

Az összekötő emellett a következő lehetőségeket is támogatja:

Lehetőség Alapértelmezett Leírás
reliabilityLevel BEST_EFFORT BEST_EFFORT vagy NO_DUPLICATES. NO_DUPLICATES megbízható beszúrást valósít meg az újraindítási forgatókönyvek esetén végrehajtó környezetben
dataPoolDataSource none none azt jelenti, hogy az érték nincs beállítva, és az összekötőnek egyetlen SQL Server-példányba kell írnia. Állítsa ezt az értéket adatforrásnévre, hogy adatkészlettáblát írjon a Big Data-fürtökben.
isolationLevel READ_COMMITTED Az elkülönítési szint megadása
tableLock false Az TABLOCK opcióval történő beszúrás végrehajtása az írási teljesítmény javítása érdekében
schemaCheckEnabled true Letiltja a szigorú adatkeret- és SQL-táblaséma-ellenőrzést hamis értékre állítva

Állítsa be a többi tömeges másolási beállítást a dataframe lehetőségeknél. A csatlakozó ezeket a beállításokat az bulkcopy API-knak továbbítja íráskor.

Teljesítmény-összehasonlítás

Az SQL Serverhez és az Azure SQL-hez készült Apache Spark Connector akár 15-ször gyorsabb, mint az általános JDBC-összekötő az SQL Serverre való íráshoz. A teljesítmény jellemzői típustól, adatmennyiségtől, a használt lehetőségektől és az egyes futtatások közötti eltérésektől függően változhatnak. A következő teljesítményeredmények azt a időt mutatják, ami ahhoz szükséges, hogy a 143,9 millió sorból álló SQL-táblát felülírjuk egy Spark környezetben.dataframe A spark dataframe a store_sales használatával létrehozott HDFS-tábla olvasásával jön létre. Az store_sales és dataframe közötti olvasási idő ki van zárva. A három futtatás eredményeit átlagolják.

Összekötő típusa Beállítások Leírás Írási idő
JDBCConnector Alapértelmezett Általános JDBC-összekötő alapértelmezett beállításokkal 1385 másodperc
sql-spark-connector BEST_EFFORT Legjobb erőfeszítés alapértelmezett beállításokkal sql-spark-connector 580 másodperc
sql-spark-connector NO_DUPLICATES Megbízható sql-spark-connector 709 másodperc
sql-spark-connector BEST_EFFORT + tabLock=true A legjobb erőfeszítés a sql-spark-connector táblazárolás engedélyezésével 72 másodperc
sql-spark-connector NO_DUPLICATES + tabLock=true Megbízható sql-spark-connector , táblazárolás engedélyezve 198 másodperc

Konfigurálás

  • Spark-konfiguráció: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
  • Data Gen konfiguráció: léptéktényező=50, particionált_táblák=igaz
  • Adatfájl store_sales 143 997 590 sorszámmal

Környezet

  • SQL Server Big Data klaszter CU5
  • master + 6 csomópont
  • Minden csomópont egy Gen-5-kiszolgáló, 512 GB RAM,csomópontonként 4 TB NVM és 10 Gb/s hálózati adapter

Gyakori problémák

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Ez a hiba akkor fordul elő, ha az mssql illesztőprogram egy régebbi verzióját használja a Hadoop-környezetben. A csatlakozó most már tartalmazza ezt az illesztőprogramot. Ha korábban az Azure SQL Connectort használta, és manuálisan telepített illesztőprogramokat a fürtre a Microsoft Entra-hitelesítés kompatibilitása érdekében, távolítsa el ezeket az illesztőprogramokat.

A hiba kijavítása:

  1. Ha általános Hadoop-környezetet használ, ellenőrizze és távolítsa el a mssql JAR-t a következő paranccsal: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Ha Databricks-et használ, vegyen fel egy globális vagy fürtbeli init-szkriptet az mssql illesztőprogram régi verzióinak eltávolításához a /databricks/jars mappából, vagy adja hozzá ezt a sort egy meglévő szkripthez: rm /databricks/jars/*mssql*

  2. Adja hozzá a adal4j és mssql csomagokat. Használhatja például a Mavent, de bármilyen módon működnie kell.

    Figyelmeztetés

    Ne telepítse így az SQL Spark-összekötőt.

  3. Adja hozzá az illesztőprogram-osztályt a kapcsolatkonfigurációhoz. Például:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

További információért tekintse meg a https://github.com/microsoft/sql-spark-connector/issues/26 megfejtését.

Első lépések

Az SQL Serverhez és az Azure SQL-hez készült Apache Spark Connector a Spark DataSourceV1 API-n és az SQL Server Bulk API-n alapul. Ugyanazt a felületet használja, mint a beépített JDBC Spark-SQL összekötő. Ezzel az integrációval egyszerűen integrálhatja az összekötőt, és migrálhatja a meglévő Spark-feladatokat a formátumparaméter frissítésével com.microsoft.sqlserver.jdbc.spark.

Az összekötő projektekbe való belefoglalásához töltse le ezt az adattárat, és hozza létre a JAR-t az SBT használatával.

Írás új SQL-táblába

Figyelmeztetés

A overwrite mód először elveti a táblát, ha már létezik az adatbázisban. Ezzel a beállítással elkerülheti a váratlan adatvesztést.

Ha a overwrite módot truncate nélkül használja a táblát újraépítésekor, a művelet eltávolítja az indexeket. Emellett az oszlopos tábla halomtáblává alakul. A meglévő indexek megtartásához állítsa a truncate beállítást a következőre: true. Például: .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Hozzáfűzés az SQL-táblához

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Az elkülönítési szint megadása

Ez az összekötő alapértelmezés szerint az READ_COMMITTED elkülönítési szintet használja, amikor tömegesen szúr be adatokat az adatbázisba. Az elkülönítési szint felülbírálásához használja a mssqlIsolationLevel opciót:

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Olvasás SQL-táblából

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra-hitelesítés

Python példa szolgáltatási fő azonosítóval

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Python-példa Active Directory-jelszóval

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Az Active Directory használatával történő hitelesítéshez telepítse a szükséges függőséget.

Ha használja ActiveDirectoryPassword, az user értéknek UPN formátumban kell lennie, például username@domainname.com.

Scala esetén telepítse az összetevőtcom.microsoft.aad.adal4j.

Python esetén telepítse a kódtáratadal. Ez a kódtár pipen keresztül érhető el.

Példákért tekintse meg a mintajegyzetfüzeteket.