Query's uitvoeren op databases met JDBC

Azure Databricks biedt ondersteuning voor het maken van verbinding met externe databases met behulp van JDBC. Dit artikel bevat de basissyntaxis voor het configureren en gebruiken van deze verbindingen met voorbeelden in Python, SQL en Scala.

Notitie

U kunt de voorkeur geven aan Lakehouse Federation voor het beheren van query's naar externe databasesystemen. Zie Wat is Lakehouse Federation.

Partner Verbinding maken biedt geoptimaliseerde integraties voor het synchroniseren van gegevens met veel externe gegevensbronnen. Zie Wat is Databricks Partner Verbinding maken?

Belangrijk

De voorbeelden in dit artikel bevatten geen gebruikersnamen en wachtwoorden in JDBC-URL's. Databricks raadt het gebruik van geheimen aan om uw databasereferenties op te slaan. Voorbeeld:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Als u wilt verwijzen naar Databricks-geheimen met SQL, moet u een Spark-configuratie-eigenschap configureren tijdens het gebruik van het cluster.

Zie het voorbeeld van een geheime werkstroom voor een volledig voorbeeld van geheimbeheer.

Gegevens lezen met JDBC

U moet een aantal instellingen configureren om gegevens te lezen met behulp van JDBC. Houd er rekening mee dat elke database een andere indeling voor de <jdbc-url>database gebruikt.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark leest het schema automatisch uit de databasetabel en wijst de typen weer toe aan Spark SQL-typen.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

U kunt query's uitvoeren op deze JDBC-tabel:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Gegevens schrijven met JDBC

Als u gegevens opslaat in tabellen met JDBC, worden vergelijkbare configuraties gebruikt om te lezen. Zie het volgende voorbeeld:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Het standaardgedrag probeert een nieuwe tabel te maken en genereert een fout als er al een tabel met die naam bestaat.

U kunt gegevens toevoegen aan een bestaande tabel met behulp van de volgende syntaxis:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

U kunt een bestaande tabel overschrijven met de volgende syntaxis:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Parallelle uitvoering van besturingselementen voor JDBC-query's

Standaard voert het JDBC-stuurprogramma een query uit op de brondatabase met slechts één thread. Als u de prestaties voor leesbewerkingen wilt verbeteren, moet u een aantal opties opgeven om te bepalen hoeveel gelijktijdige query's Azure Databricks in uw database maakt. Voor kleine clusters zorgt het instellen van de numPartitions optie gelijk aan het aantal uitvoerderskernen in uw cluster ervoor dat alle knooppunten parallel query's uitvoeren op gegevens.

Waarschuwing

Het instellen numPartitions op een hoge waarde op een groot cluster kan leiden tot negatieve prestaties voor de externe database, omdat te veel gelijktijdige query's de service kunnen overweldigen. Dit is vooral lastig voor toepassingsdatabases. Wees voorzichtig met het instellen van deze waarde boven de 50.

Notitie

Versnel query's door een kolom te selecteren met een index die is berekend in de brondatabase voor de partitionColumn.

In het volgende codevoorbeeld ziet u hoe u parallellisme configureert voor een cluster met acht kernen:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Notitie

Azure Databricks ondersteunt alle Apache Spark-opties voor het configureren van JDBC.

Bij het schrijven naar databases met behulp van JDBC gebruikt Apache Spark het aantal partities in het geheugen om parallelle uitvoering te beheren. U kunt gegevens opnieuw partitioneren voordat u schrijft om parallellisme te beheren. Vermijd een groot aantal partities op grote clusters om te voorkomen dat uw externe database wordt overstuurd. In het volgende voorbeeld ziet u hoe u opnieuw partitioneert naar acht partities voordat u gaat schrijven:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Een query naar de database-engine pushen

U kunt een volledige query naar de database pushen en alleen het resultaat retourneren. De table parameter identificeert de JDBC-tabel die moet worden gelezen. U kunt alles gebruiken wat geldig is in een SQL-querycomponent FROM .

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Het aantal rijen bepalen dat per query is opgehaald

JDBC-stuurprogramma's hebben een fetchSize parameter waarmee het aantal rijen wordt bepaald dat tegelijk is opgehaald uit de externe database.

Instelling Resultaat
Te laag Hoge latentie vanwege veel roundtrips (weinig rijen geretourneerd per query)
Te hoog Onvoldoende geheugenfout (te veel gegevens geretourneerd in één query)

De optimale waarde is afhankelijk van de werkbelasting. Overwegingen zijn onder andere:

  • Hoeveel kolommen worden door de query geretourneerd?
  • Welke gegevenstypen worden geretourneerd?
  • Hoe lang worden de tekenreeksen in elke kolom geretourneerd?

Systemen hebben mogelijk een zeer kleine standaardinstelling en profiteren van afstemming. Bijvoorbeeld: de standaardwaarde fetchSize van Oracle is 10. Als u dit verhoogt tot 100, wordt het aantal totale query's verminderd dat moet worden uitgevoerd met een factor 10. JDBC-resultaten zijn netwerkverkeer, dus vermijd zeer grote getallen, maar optimale waarden zijn mogelijk in de duizenden voor veel gegevenssets.

Gebruik de fetchSize optie, zoals in het volgende voorbeeld:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()