Fråga databaser med JDBC
Azure Databricks stöder anslutning till externa databaser med JDBC. Den här artikeln innehåller grundläggande syntax för att konfigurera och använda dessa anslutningar med exempel i Python, SQL och Scala.
Viktigt!
De konfigurationer som beskrivs i den här artikeln är Experimentella. Experimentella funktioner tillhandahålls som de är och stöds inte av Databricks via teknisk kundsupport. För att få fullständigt stöd för frågefederation bör du i stället använda Lakehouse Federation, vilket gör att dina Azure Databricks-användare kan dra nytta av Unity Catalog-syntaxen och datastyrningsverktygen.
Partner Connect tillhandahåller optimerade integreringar för synkronisering av data med många externa datakällor. Se Vad är Databricks Partner Connect?.
Viktigt!
Exemplen i den här artikeln innehåller inte användarnamn och lösenord i JDBC-URL:er. Databricks rekommenderar att du använder hemligheter för att lagra dina databasautentiseringsuppgifter. Till exempel:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Om du vill referera till Databricks-hemligheter med SQL måste du konfigurera en Spark-konfigurationsegenskap under klusteriterilisering.
Ett fullständigt exempel på hemlig hantering finns i Exempel på hemligt arbetsflöde.
Läsa data med JDBC
Du måste konfigurera ett antal inställningar för att läsa data med JDBC. Observera att varje databas använder ett annat format för <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark läser automatiskt schemat från databastabellen och mappar tillbaka dess typer till Spark SQL-typer.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Du kan köra frågor mot den här JDBC-tabellen:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Skriva data med JDBC
Om du sparar data i tabeller med JDBC används liknande konfigurationer för läsning. Se följande exempel:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Standardbeteendet försöker skapa en ny tabell och utlöser ett fel om det redan finns en tabell med det namnet.
Du kan lägga till data i en befintlig tabell med hjälp av följande syntax:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Du kan skriva över en befintlig tabell med hjälp av följande syntax:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Kontrollera parallellitet för JDBC-frågor
Som standard frågar JDBC-drivrutinen källdatabasen med endast en enda tråd. För att förbättra prestanda för läsningar måste du ange ett antal alternativ för att styra hur många samtidiga frågor Azure Databricks gör till databasen. För små kluster säkerställer inställningen numPartitions
av alternativet lika med antalet körkärnor i klustret att alla noder kör frågor mot data parallellt.
Varning
Om du anger numPartitions
ett högt värde för ett stort kluster kan det leda till negativa prestanda för fjärrdatabasen, eftersom för många samtidiga frågor kan överbelasta tjänsten. Detta är särskilt besvärligt för programdatabaser. Var försiktig med att ange det här värdet över 50.
Kommentar
Snabba upp frågor genom att välja en kolumn med ett index beräknat i källdatabasen partitionColumn
för .
I följande kodexempel visas hur du konfigurerar parallellitet för ett kluster med åtta kärnor:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Kommentar
Azure Databricks stöder alla Apache Spark-alternativ för att konfigurera JDBC.
När du skriver till databaser med JDBC använder Apache Spark antalet partitioner i minnet för att kontrollera parallellitet. Du kan partitionera om data innan du skriver för att kontrollera parallellitet. Undvik ett stort antal partitioner i stora kluster för att undvika att överbelasta fjärrdatabasen. I följande exempel visas ompartitionering till åtta partitioner innan du skriver:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Skicka en fråga till databasmotorn
Du kan push-överföra en hel fråga till databasen och bara returnera resultatet. Parametern table
identifierar JDBC-tabellen som ska läsas. Du kan använda allt som är giltigt i en SQL-frågesats FROM
.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Kontrollera antalet rader som hämtas per fråga
JDBC-drivrutiner har en fetchSize
parameter som styr antalet rader som hämtas åt gången från fjärrdatabasen.
Inställning | Result |
---|---|
För lågt | Hög svarstid på grund av många tur- och returresor (få rader returneras per fråga) |
För högt | Fel om slut på minne (för mycket data returneras i en fråga) |
Det optimala värdet är arbetsbelastningsberoende. Här är några saker att tänka på:
- Hur många kolumner returneras av frågan?
- Vilka datatyper returneras?
- Hur länge returneras strängarna i varje kolumn?
System kan ha mycket liten standard och dra nytta av justering. Till exempel: Oracles standard fetchSize
är 10. Om du ökar den till 100 minskar antalet totala frågor som måste köras med en faktor på 10. JDBC-resultat är nätverkstrafik, så undvik mycket stora tal, men optimala värden kan vara i tusental för många datauppsättningar.
Använd alternativet fetchSize
, som i följande exempel:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()