Megosztás a következőn keresztül:


JDBC-t használó adatbázisok lekérdezése

Az Azure Databricks támogatja a külső adatbázisokhoz való csatlakozást A JDBC használatával. Ez a cikk a Kapcsolatok konfigurálásának és használatának alapvető szintaxisát ismerteti a Python, az SQL és a Scala példáival.

Fontos

A cikkben ismertetett konfigurációk kísérleti jellegűek. A kísérleti funkciókat a Databricks jelenleg is biztosítja, és a Databricks nem támogatja az ügyfél technikai támogatásával. A lekérdezések összevonásának teljes körű támogatásához inkább a Lakehouse Federationt kell használnia, amely lehetővé teszi az Azure Databricks-felhasználók számára, hogy kihasználhassák a Unity Catalog szintaxisát és adatszabályozási eszközeit.

A Partner Connect optimalizált integrációkat biztosít az adatok számos külső adatforrással való szinkronizálásához. Lásd : Mi az a Databricks Partner Connect?.

Fontos

A cikkben szereplő példák nem tartalmaznak felhasználóneveket és jelszavakat a JDBC URL-címeiben. A Databricks azt javasolja, hogy titkos kulcsokat használjon az adatbázis hitelesítő adatainak tárolásához. Példa:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

A Databricks-titkos kódok SQL-hez való hivatkozásához konfigurálnia kell egy Spark-konfigurációs tulajdonságot a fürt initilizálása során.

A titkos kulcsok kezelésének teljes példáját lásd a titkos munkafolyamat példájában.

Adatok olvasása a JDBC-vel

Több beállítást kell konfigurálnia az adatok JDBC használatával való olvasásához. Vegye figyelembe, hogy minden adatbázis más formátumot használ a <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

A Spark automatikusan beolvassa a sémát az adatbázistáblából, és a típusokat a Spark SQL-típusokhoz rendeli vissza.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Lekérdezéseket futtathat ezen a JDBC-táblán:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Adatok írása A JDBC használatával

Az adatok JDBC-vel való mentése hasonló konfigurációkat használ az olvasáshoz. Lásd a következő példát:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Az alapértelmezett viselkedés új táblázatot próbál létrehozni, és hibát jelez, ha már létezik ilyen nevű tábla.

Egy meglévő táblához az alábbi szintaxissal fűzhet hozzá adatokat:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Egy meglévő táblát az alábbi szintaxissal írhat felül:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Párhuzamosság szabályozása JDBC-lekérdezésekhez

Alapértelmezés szerint a JDBC-illesztő csak egyetlen szálal lekérdezi a forrásadatbázist. Az olvasási teljesítmény javítása érdekében számos lehetőséget kell megadnia annak szabályozásához, hogy az Azure Databricks hány egyidejű lekérdezést végez az adatbázison. Kis fürtök esetén a numPartitions fürt végrehajtó magjainak számával egyenlő beállítással biztosítható, hogy az összes csomópont párhuzamosan kérdezi le az adatokat.

Figyelmeztetés

Ha nagy méretű fürtön állít be numPartitions magas értéket, az negatív teljesítményt eredményezhet a távoli adatbázis számára, mivel túl sok egyidejű lekérdezés túlterhelheti a szolgáltatást. Ez különösen zavaró az alkalmazás-adatbázisok esetében. Ügyeljen arra, hogy ezt az értéket 50 fölé állítsa.

Feljegyzés

A lekérdezések felgyorsítása a forrásadatbázisban partitionColumnkiszámított indexet tartalmazó oszlop kiválasztásával.

Az alábbi példakód bemutatja, hogyan konfigurálható a párhuzamosság egy nyolc maggal rendelkező fürthöz:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Feljegyzés

Az Azure Databricks támogatja az Összes Apache Spark-beállítást a JDBC konfigurálásához.

Amikor JDBC-vel ír adatbázisba, az Apache Spark a memóriában lévő partíciók számát használja a párhuzamosság szabályozásához. A párhuzamosság szabályozásához írás előtt újrapartitálhatja az adatokat. A távoli adatbázis túlterhelésének elkerülése érdekében kerülje a nagy méretű fürtök nagy számú partícióját. Az alábbi példa nyolc partícióra való újraparticionálást mutat be írás előtt:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Lekérdezés leküldése az adatbázismotorba

Leküldhet egy teljes lekérdezést az adatbázisba, és csak az eredményt adja vissza. A table paraméter azonosítja az olvasni kívánt JDBC-táblát. Az SQL-lekérdezési FROM záradékban érvényes bármit használhat.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Lekérdezésenként lekért sorok számának szabályozása

A JDBC-illesztőprogramok olyan fetchSize paraméterrel rendelkeznek, amely a távoli adatbázisból egyszerre beolvasott sorok számát szabályozza.

Beállítás Eredmény
Túl alacsony Nagy késés sok kerekítés miatt (lekérdezésenként néhány sor visszaadva)
Túl magas Memóriahiba (túl sok adat egy lekérdezésben)

Az optimális érték a számítási feladatoktól függ. Megfontolandó szempontok:

  • Hány oszlopot ad vissza a lekérdezés?
  • Milyen adattípusokat ad vissza?
  • Mennyi ideig vannak visszaadva az egyes oszlopok sztringjei?

Előfordulhat, hogy a rendszerek alapértelmezett értéke nagyon kicsi, és a finomhangolás is előnyös lehet. Például: Az Oracle alapértelmezett értéke fetchSize 10. Ha 100-ra növeli azt, az összes lekérdezés számát csökkenti, amelyet 10-es tényezővel kell végrehajtani. A JDBC-eredmények hálózati forgalomnak számítanak, ezért kerülje a nagyon nagy számokat, de az optimális értékek sok adathalmaz esetében ezres nagyságrendben lehetnek.

Használja a fetchSize lehetőséget, ahogyan az alábbi példában is látható:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()