Abfragen von Datenbanken mit JDBC

Azure Databricks unterstützt die Verbindung zu externen Datenbanken mithilfe von JDBC. Dieser Artikel enthält die grundlegende Syntax zum Konfigurieren und Verwenden dieser Verbindungen mit Beispielen in Python, SQL und Scala.

Hinweis

Möglicherweise bevorzugen Sie Lakehouse Federation für die Verwaltung von Abfragen von externen Datenbanksystemen. Weitere Informationen unter Was ist Lakehouse Federation.

Partner Connect bietet optimierte Integrationen für die Synchronisierung von Daten mit vielen externen Datenquellen. Siehe Was ist Databricks Partner Connect?.

Wichtig

Die Beispiele in diesem Artikel enthalten keine Benutzernamen und Kennwörter in JDBC-URLs. Databricks empfiehlt die Verwendung von Geheimnissen zum Speichern Ihrer Datenbankanmeldeinformationen. Beispiele:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Um auf Databricks-Geheimnisse zu verweisen, müssen Sie während der Clusterinitilisierung eine Spark-Konfigurationseigenschaft konfigurieren.

Ein vollständiges Beispiel für die Geheimnisverwaltung finden Sie unter Beispiel für einen Geheimnisworkflow.

Lesen von Daten mit JDBC

Sie müssen eine Reihe von Einstellungen konfigurieren, um Daten mithilfe von JDBC lesen zu können. Beachten Sie, dass jede Datenbank ein anderes Format für die <jdbc-url> verwendet.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark liest das Schema automatisch aus der Datenbanktabelle und ordnet dessen Typen den Spark SQL-Typen zu.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Sie können Abfragen für diese JDBC-Tabelle ausführen:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Schreiben von Daten mit JDBC

Beim Speichern von Daten in Tabellen mit JDBC werden ähnliche Konfigurationen wie beim Lesen verwendet. Siehe folgendes Beispiel:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Beim Standardverhalten wird versucht, eine neuen Tabelle zu erstellen, und es wird eine Fehlermeldung ausgegeben, wenn bereits eine Tabelle mit diesem Namen vorhanden ist.

Sie können Daten mithilfe der folgenden Syntax an eine vorhandene Tabelle anfügen:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Sie können eine vorhandene Tabelle mithilfe der folgenden Syntax überschreiben:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Steuern des Parallelismus für JDBC-Abfragen

Standardmäßig fragt der JDBC-Treiber die Quelldatenbank nur mit einem einzigen Thread ab. Um die Leistung für Lesevorgänge zu verbessern, müssen Sie eine Reihe von Optionen angeben, um zu steuern, wie viele gleichzeitige Abfragen Azure Databricks an Ihre Datenbank macht. Bei kleinen Clustern wird durch das Festlegen der numPartitions-Option bei gleicher Anzahl von Executorkernen in Ihrem Cluster sichergestellt, dass alle Knoten Daten parallel abfragen.

Warnung

Die Einstellung von numPartitions auf einen hohen Wert in einem großen Cluster kann zu einer negativen Leistung in der Remotedatenbank führen, da der Dienst durch zu viele gleichzeitige Abfragen überlastet werden könnte. Dies ist besonders problematisch bei Anwendungsdatenbanken. Legen Sie diese Einstellung besser nicht auf einen Wert über 50 fest.

Hinweis

Beschleunigen Sie Abfragen, indem Sie eine Spalte mit einem Index auswählen, der in der Quelldatenbank für die partitionColumn berechnet wurde.

Im folgenden Codebeispiel wird die Konfiguration des Parallelismus für einen Cluster mit acht Kernen veranschaulicht:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Hinweis

Azure Databricks unterstützt alle Apache Spark -Optionen zum Konfigurieren von JDBC.

Beim Schreiben in Datenbanken mit JDBC verwendet Apache Spark die Anzahl der Partitionen im Arbeitsspeicher, um die Parallelität zu steuern. Sie können Daten neu partitionieren, bevor Sie zum Steuern der Parallelität schreiben. Vermeiden Sie eine hohe Anzahl von Partitionen auf großen Clustern, um zu vermeiden, dass Ihre Remotedatenbank überlastet wird. Im folgenden Beispiel wird die Neupartitionierung auf acht Partitionen vor dem Schreiben veranschaulicht:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Weitergeben einer Abfrage an die Datenbank-Engine

Sie können eine gesamte Abfrage an die Datenbank weitergeben und nur das Ergebnis zurückgeben. Der Parameter table ermittelt die zu lesende JDBC-Tabelle. Jedes gültige Format der SQL-Abfrageklausel FROM ist zulässig.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Steuern der Anzahl der pro Abfrage abgerufenen Zeilen

JDBC-Treiber verfügen über einen fetchSize-Parameter, der die Anzahl der Zeilen steuert, die gleichzeitig aus der JDBC-Remotedatenbank abgerufen werden.

Einstellung Ergebnis
Zu niedrig Hohe Latenz aufgrund vieler Roundtrips (wenige Zeilen, die pro Abfrage zurückgegeben werden)
Zu hoch Speicherfehler (zu viele Daten, die in einer Abfrage zurückgegeben werden)

Der optimale Wert ist abhängig von der Arbeitsauslastung. Diese Aspekte spielen eine Rolle:

  • Wie viele Spalten werden von der Abfrage zurückgegeben?
  • Welche Datentypen werden zurückgegeben?
  • Wie lange werden die Zeichenfolgen in jeder Spalte zurückgegeben?

Systeme haben möglicherweise sehr geringe Standardeinstellungen und profitieren von der Optimierung. Beispiel: Die Standardeinstellung fetchSize von Oracle ist 10. Durch das Erhöhen auf 100 wird die Anzahl der Gesamtabfragen reduziert, die mit dem Faktor 10 ausgeführt werden müssen. JDBC-Ergebnisse sind Netzwerkdatenverkehr; vermeiden Sie daher sehr große Zahlen, aber optimale Werte können bei vielen Datasets in den Tausenden liegen.

Verwenden Sie wie im folgenden Beispiel die Option fetchSize:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()