JDBC-t használó adatbázisok lekérdezése
Az Azure Databricks támogatja a külső adatbázisokhoz való csatlakozást A JDBC használatával. Ez a cikk a Kapcsolatok konfigurálásának és használatának alapvető szintaxisát ismerteti a Python, az SQL és a Scala példáival.
Fontos
A cikkben ismertetett konfigurációk kísérleti jellegűek. A kísérleti funkciókat a Databricks jelenleg is biztosítja, és a Databricks nem támogatja az ügyfél technikai támogatásával. A lekérdezések összevonásának teljes körű támogatásához inkább a Lakehouse Federationt kell használnia, amely lehetővé teszi az Azure Databricks-felhasználók számára, hogy kihasználhassák a Unity Catalog szintaxisát és adatszabályozási eszközeit.
A Partner Connect optimalizált integrációkat biztosít az adatok számos külső adatforrással való szinkronizálásához. Lásd : Mi az a Databricks Partner Connect?.
Fontos
A cikkben szereplő példák nem tartalmaznak felhasználóneveket és jelszavakat a JDBC URL-címeiben. A Databricks azt javasolja, hogy titkos kulcsokat használjon az adatbázis hitelesítő adatainak tárolásához. Példa:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
A Databricks-titkos kódok SQL-hez való hivatkozásához konfigurálnia kell egy Spark-konfigurációs tulajdonságot a fürt initilizálása során.
A titkos kulcsok kezelésének teljes példáját lásd a titkos munkafolyamat példájában.
Adatok olvasása a JDBC-vel
Több beállítást kell konfigurálnia az adatok JDBC használatával való olvasásához. Vegye figyelembe, hogy minden adatbázis más formátumot használ a <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
A Spark automatikusan beolvassa a sémát az adatbázistáblából, és a típusokat a Spark SQL-típusokhoz rendeli vissza.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Lekérdezéseket futtathat ezen a JDBC-táblán:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Adatok írása A JDBC használatával
Az adatok JDBC-vel való mentése hasonló konfigurációkat használ az olvasáshoz. Lásd a következő példát:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Az alapértelmezett viselkedés új táblázatot próbál létrehozni, és hibát jelez, ha már létezik ilyen nevű tábla.
Egy meglévő táblához az alábbi szintaxissal fűzhet hozzá adatokat:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Egy meglévő táblát az alábbi szintaxissal írhat felül:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Párhuzamosság szabályozása JDBC-lekérdezésekhez
Alapértelmezés szerint a JDBC-illesztő csak egyetlen szálal lekérdezi a forrásadatbázist. Az olvasási teljesítmény javítása érdekében számos lehetőséget kell megadnia annak szabályozásához, hogy az Azure Databricks hány egyidejű lekérdezést végez az adatbázison. Kis fürtök esetén a numPartitions
fürt végrehajtó magjainak számával egyenlő beállítással biztosítható, hogy az összes csomópont párhuzamosan kérdezi le az adatokat.
Figyelmeztetés
Ha nagy méretű fürtön állít be numPartitions
magas értéket, az negatív teljesítményt eredményezhet a távoli adatbázis számára, mivel túl sok egyidejű lekérdezés túlterhelheti a szolgáltatást. Ez különösen zavaró az alkalmazás-adatbázisok esetében. Ügyeljen arra, hogy ezt az értéket 50 fölé állítsa.
Feljegyzés
A lekérdezések felgyorsítása a forrásadatbázisban partitionColumn
kiszámított indexet tartalmazó oszlop kiválasztásával.
Az alábbi példakód bemutatja, hogyan konfigurálható a párhuzamosság egy nyolc maggal rendelkező fürthöz:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Feljegyzés
Az Azure Databricks támogatja az Összes Apache Spark-beállítást a JDBC konfigurálásához.
Amikor JDBC-vel ír adatbázisba, az Apache Spark a memóriában lévő partíciók számát használja a párhuzamosság szabályozásához. A párhuzamosság szabályozásához írás előtt újrapartitálhatja az adatokat. A távoli adatbázis túlterhelésének elkerülése érdekében kerülje a nagy méretű fürtök nagy számú partícióját. Az alábbi példa nyolc partícióra való újraparticionálást mutat be írás előtt:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Lekérdezés leküldése az adatbázismotorba
Leküldhet egy teljes lekérdezést az adatbázisba, és csak az eredményt adja vissza. A table
paraméter azonosítja az olvasni kívánt JDBC-táblát. Az SQL-lekérdezési FROM
záradékban érvényes bármit használhat.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Lekérdezésenként lekért sorok számának szabályozása
A JDBC-illesztőprogramok olyan fetchSize
paraméterrel rendelkeznek, amely a távoli adatbázisból egyszerre beolvasott sorok számát szabályozza.
Beállítás | Eredmény |
---|---|
Túl alacsony | Nagy késés sok kerekítés miatt (lekérdezésenként néhány sor visszaadva) |
Túl magas | Memóriahiba (túl sok adat egy lekérdezésben) |
Az optimális érték a számítási feladatoktól függ. Megfontolandó szempontok:
- Hány oszlopot ad vissza a lekérdezés?
- Milyen adattípusokat ad vissza?
- Mennyi ideig vannak visszaadva az egyes oszlopok sztringjei?
Előfordulhat, hogy a rendszerek alapértelmezett értéke nagyon kicsi, és a finomhangolás is előnyös lehet. Például: Az Oracle alapértelmezett értéke fetchSize
10. Ha 100-ra növeli azt, az összes lekérdezés számát csökkenti, amelyet 10-es tényezővel kell végrehajtani. A JDBC-eredmények hálózati forgalomnak számítanak, ezért kerülje a nagyon nagy számokat, de az optimális értékek sok adathalmaz esetében ezres nagyságrendben lehetnek.
Használja a fetchSize
lehetőséget, ahogyan az alábbi példában is látható:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()