Oktatás
Modul
Teljesítmény optimalizálása Spark- és Delta-élő táblákkal - Training
Optimalizálja a teljesítményt a Spark és a Delta Live Tables használatával az Azure Databricksben.
Ezt a böngészőt már nem támogatjuk.
Frissítsen a Microsoft Edge-re, hogy kihasználhassa a legújabb funkciókat, a biztonsági frissítéseket és a technikai támogatást.
Az Azure Databricks támogatja a külső adatbázisokhoz való csatlakozást A JDBC használatával. Ez a cikk az connections konfigurálásának és használatának alapvető szintaxisát ismerteti a Python, az SQL és a Scala példáival.
Fontos
A cikkben ismertetett konfigurációk kísérleti jellegűek. A kísérleti funkciókat a Databricks jelenleg is biztosítja, és a Databricks nem támogatja az ügyfél technikai támogatásával. A lekérdezési összevonás teljes támogatásának get ehelyett Lakehouse Federationkell használnia, amely lehetővé teszi, hogy az Azure Databricks-felhasználók kihasználhassák a Unity Catalog szintaxisát és adatszabályozási eszközeit.
A Partner Connect optimalizált integrációkat biztosít az adatok számos külső adatforrással való szinkronizálásához. Lásd : Mi az a Databricks Partner Connect?.
Fontos
A cikkben szereplő példák nem tartalmaznak felhasználóneveket és jelszavakat a JDBC URL-címeiben. A Databricks titkos kulcsok használatát javasolja az adatbázis credentialstárolásához. Példa:
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
A Databricks-titkos kódok SQL-hez való hivatkozásához konfigurálnia kell egy Spark-konfigurációs tulajdonságot a fürt initilizálása során.
A titkos kódok kezelésére vonatkozó teljes példáért tekintse meg az oktatóanyagot: Databricks-titkos kód létrehozása és használata.
Több beállítást kell konfigurálnia az adatok JDBC használatával való olvasásához. Vegye figyelembe, hogy minden adatbázis más formátumot használ a <jdbc-url>
.
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
A Spark automatikusan beolvassa a schema-t az table adatbázisból, és a típusokat a Spark SQL-típusokra képezi le.
employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema
Lekérdezéseket futtathat ezen a JDBC tablekapcsolaton:
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Az adatok a tables-ra JDBC-vel való mentése hasonló beállításokat alkalmaz, mint az olvasásnál. Lásd a következő példát:
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Az alapértelmezett viselkedés új table próbál létrehozni, és hibát jelez, ha már létezik ilyen nevű table.
Meglévő table-hoz az alábbi szintaxissal fűzhet hozzá adatokat:
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Egy meglévő table az alábbi szintaxissal írhatja felül:
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Alapértelmezés szerint a JDBC-illesztő csak egyetlen szálal lekérdezi a forrásadatbázist. Az olvasási teljesítmény javítása érdekében számos lehetőséget kell megadnia annak szabályozásához, hogy az Azure Databricks hány egyidejű lekérdezést végez az adatbázison. Kis fürtök esetén a numPartitions
fürt végrehajtó magjainak számával egyenlő beállítással biztosítható, hogy az összes csomópont párhuzamosan kérdezi le az adatokat.
Figyelmeztetés
Ha nagy méretű fürtön állít be numPartitions
magas értéket, az negatív teljesítményt eredményezhet a távoli adatbázis számára, mivel túl sok egyidejű lekérdezés túlterhelheti a szolgáltatást. Ez különösen zavaró az alkalmazás-adatbázisok esetében. Ügyeljen arra, hogy ezt az értéket 50 fölé állítsa.
Megjegyzés
A lekérdezések felgyorsítása a partitionColumn
forrásadatbázisában kiszámított indexet tartalmazó column kiválasztásával.
Az alábbi példakód bemutatja, hogyan konfigurálható a párhuzamosság egy nyolc maggal rendelkező fürthöz:
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Megjegyzés
Az Azure Databricks támogatja az Összes Apache Spark-beállítást a JDBC konfigurálásához.
Amikor JDBC-vel ír adatbázisba, az Apache Spark a memóriában lévő partíciók számát használja a párhuzamosság szabályozásához. A párhuzamosság szabályozásához írás előtt újrapartitálhatja az adatokat. A távoli adatbázis túlterhelésének elkerülése érdekében kerülje a nagy méretű fürtök nagy számú partícióját. Az alábbi példa nyolc partícióra való újraparticionálást mutat be írás előtt:
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Leküldhet egy teljes lekérdezést az adatbázisba, és csak az eredményt adja vissza. A table
paraméter azonosítja az olvasni kívánt JDBC-table. Az SQL-lekérdezési FROM
záradékban érvényes bármit használhat.
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
A JDBC-illesztőprogramok olyan fetchSize
paraméterrel rendelkeznek, amely a távoli adatbázisból egyszerre beolvasott sorok számát szabályozza.
Beállítás | Eredmény |
---|---|
Túl alacsony | Nagy késés sok kerekítés miatt (lekérdezésenként néhány sor visszaadva) |
Túl magas | Memóriahiba (túl sok adat egy lekérdezésben) |
Az optimális érték a számítási feladatoktól függ. Megfontolandó szempontok:
Előfordulhat, hogy a rendszerek alapértelmezett értéke nagyon kicsi, és a finomhangolás is előnyös lehet. Például: Az Oracle alapértelmezett értéke fetchSize
10. Ha 100-ra növeli azt, az összes lekérdezés számát csökkenti, amelyet 10-es tényezővel kell végrehajtani. A JDBC-eredmények hálózati forgalomnak minősülnek, ezért kerülje a nagyon nagy számokat, de az optimális values sok adathalmaz esetében ezres nagyságrendben lehetnek.
Használja a fetchSize
lehetőséget, ahogyan az alábbi példában is látható:
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
Oktatás
Modul
Teljesítmény optimalizálása Spark- és Delta-élő táblákkal - Training
Optimalizálja a teljesítményt a Spark és a Delta Live Tables használatával az Azure Databricksben.