Köra frågor mot databaser med JDBC

Azure Databricks stöder anslutning till externa databaser med JDBC. Den här artikeln innehåller grundläggande syntax för att konfigurera och använda dessa anslutningar med exempel i Python, SQL och Scala.

Kommentar

Du kanske föredrar Lakehouse Federation för att hantera frågor till externa databassystem. Se Vad är Lakehouse Federation.

Partner Anslut tillhandahåller optimerade integreringar för synkronisering av data med många externa datakällor. Se Vad är Databricks Partner Anslut?.

Viktigt!

Exemplen i den här artikeln innehåller inte användarnamn och lösenord i JDBC-URL:er. Databricks rekommenderar att du använder hemligheter för att lagra dina databasautentiseringsuppgifter. Till exempel:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Om du vill referera till Databricks-hemligheter med SQL måste du konfigurera en Spark-konfigurationsegenskap under klusteriterilisering.

Ett fullständigt exempel på hemlig hantering finns i Exempel på hemligt arbetsflöde.

Läsa data med JDBC

Du måste konfigurera ett antal inställningar för att läsa data med JDBC. Observera att varje databas använder ett annat format för <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark läser automatiskt schemat från databastabellen och mappar tillbaka dess typer till Spark SQL-typer.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Du kan köra frågor mot den här JDBC-tabellen:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Skriva data med JDBC

Om du sparar data i tabeller med JDBC används liknande konfigurationer för läsning. Se följande exempel:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Standardbeteendet försöker skapa en ny tabell och utlöser ett fel om det redan finns en tabell med det namnet.

Du kan lägga till data i en befintlig tabell med hjälp av följande syntax:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Du kan skriva över en befintlig tabell med hjälp av följande syntax:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Kontrollera parallellitet för JDBC-frågor

Som standard frågar JDBC-drivrutinen källdatabasen med endast en enda tråd. För att förbättra prestanda för läsningar måste du ange ett antal alternativ för att styra hur många samtidiga frågor Azure Databricks gör till databasen. För små kluster säkerställer inställningen numPartitions av alternativet lika med antalet körkärnor i klustret att alla noder kör frågor mot data parallellt.

Varning

Om du anger numPartitions ett högt värde för ett stort kluster kan det leda till negativa prestanda för fjärrdatabasen, eftersom för många samtidiga frågor kan överbelasta tjänsten. Detta är särskilt besvärligt för programdatabaser. Var försiktig med att ange det här värdet över 50.

Kommentar

Snabba upp frågor genom att välja en kolumn med ett index beräknat i källdatabasen partitionColumnför .

I följande kodexempel visas hur du konfigurerar parallellitet för ett kluster med åtta kärnor:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Kommentar

Azure Databricks stöder alla Apache Spark-alternativ för att konfigurera JDBC.

När du skriver till databaser med JDBC använder Apache Spark antalet partitioner i minnet för att kontrollera parallellitet. Du kan partitionera om data innan du skriver för att kontrollera parallellitet. Undvik ett stort antal partitioner i stora kluster för att undvika att överbelasta fjärrdatabasen. I följande exempel visas ompartitionering till åtta partitioner innan du skriver:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Skicka en fråga till databasmotorn

Du kan push-överföra en hel fråga till databasen och bara returnera resultatet. Parametern table identifierar JDBC-tabellen som ska läsas. Du kan använda allt som är giltigt i en SQL-frågesats FROM .

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Kontrollera antalet rader som hämtas per fråga

JDBC-drivrutiner har en fetchSize parameter som styr antalet rader som hämtas åt gången från fjärrdatabasen.

Inställning Resultat
För lågt Hög svarstid på grund av många tur- och returresor (få rader returneras per fråga)
För högt Fel om slut på minne (för mycket data returneras i en fråga)

Det optimala värdet är arbetsbelastningsberoende. Här är några saker att tänka på:

  • Hur många kolumner returneras av frågan?
  • Vilka datatyper returneras?
  • Hur länge returneras strängarna i varje kolumn?

System kan ha mycket liten standard och dra nytta av justering. Till exempel: Oracles standard fetchSize är 10. Om du ökar den till 100 minskar antalet totala frågor som måste köras med en faktor på 10. JDBC-resultat är nätverkstrafik, så undvik mycket stora tal, men optimala värden kan vara i tusental för många datauppsättningar.

Använd alternativet fetchSize , som i följande exempel:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()