Freigeben über


Abfragen von Datenbanken mit JDBC

Azure Databricks unterstützt die Verbindung zu externen Datenbanken mithilfe von JDBC. Dieser Artikel enthält die grundlegende Syntax zum Konfigurieren und Verwenden dieser Verbindungen mit Beispielen in Python, SQL und Scala.

Wichtig

Die Legacy-Abfrageverbunddokumentation wurde eingestellt und kann nicht aktualisiert werden. Die in diesem Inhalt genannten Konfigurationen werden nicht offiziell von Databricks unterstützt oder getestet. Wenn Lakehouse Federation Ihre Quelldatenbank unterstützt, empfiehlt Databricks stattdessen die Verwendung.

Partner Connect bietet optimierte Integrationen zum Synchronisieren von Daten mit vielen externen Datenquellen. Siehe Was ist Databricks Partner Connect?.

Wichtig

Die Beispiele in diesem Artikel enthalten keine Benutzernamen und Kennwörter in JDBC-URLs. Databricks empfiehlt die Verwendung geheimer Schlüssel zum Speichern Ihrer Datenbankanmeldeinformationen. Beispiel:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Um auf Databricks-Schlüssel mit SQL zu verweisen, müssen Sie während der Clusterinitilisierung eine Spark-Konfigurationseigenschaft konfigurieren.

Ein vollständiges Beispiel für die geheime Verwaltung finden Sie im Lernprogramm: Erstellen und Verwenden eines Geheimschlüssels für Databricks.

Lesen von Daten mit JDBC

Sie müssen eine Reihe von Einstellungen konfigurieren, um Daten mithilfe von JDBC lesen zu können. Beachten Sie, dass jede Datenbank ein anderes Format für die <jdbc-url> verwendet.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark liest das Schema automatisch aus der Datenbanktabelle und ordnet die zugehörigen Typen zurück zu Spark SQL-Typen zu.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Sie können Abfragen gegen diese JDBC-Tabelle ausführen.

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Schreiben von Daten mit JDBC

Das Speichern von Daten in Tabellen mit JDBC verwendet ähnliche Konfigurationen wie beim Lesen. Sehen Sie sich das folgende Beispiel an:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Das Standardverhalten versucht, eine neue Tabelle zu erstellen und löst einen Fehler aus, wenn bereits eine Tabelle mit diesem Namen vorhanden ist.

Sie können Daten mithilfe der folgenden Syntax an eine vorhandene Tabelle anfügen:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Sie können eine vorhandene Tabelle mit der folgenden Syntax überschreiben:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Steuern der Parallelität für JDBC-Abfragen

Standardmäßig fragt der JDBC-Treiber die Quelldatenbank nur mit einem einzigen Thread ab. Um die Leistung für Lesevorgänge zu verbessern, müssen Sie eine Reihe von Optionen angeben, um zu steuern, wie viele gleichzeitige Abfragen Azure Databricks an Ihrer Datenbank durchführen. Bei kleinen Clustern stellt das Festlegen der numPartitions Option gleich der Anzahl der Executorkerne in Ihrem Cluster sicher, dass alle Knoten Daten parallel abfragen.

Warnung

Das Festlegen von numPartitions auf einen hohen Wert für einen großen Cluster kann zu einer schlechten Leistung der Remotedatenbank führen, da zu viele gleichzeitige Abfragen den Dienst überlasten könnten. Dies ist besonders problematisch für Anwendungsdatenbanken. Seien Sie vorsichtig, diesen Wert über 50 festzulegen.

Hinweis

Beschleunigen Sie Abfragen, indem Sie eine Spalte auswählen, die einen Index hat, der in der Quelldatenbank für die partitionColumn berechnet wurde.

Im folgenden Codebeispiel wird die Konfiguration der Parallelität für einen Cluster mit acht Kernen veranschaulicht:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Hinweis

Azure Databricks unterstützt alle Apache Spark-Optionen zum Konfigurieren von JDBC.

Beim Schreiben in Datenbanken mithilfe von JDBC verwendet Apache Spark die Anzahl der Partitionen im Speicher, um die Parallelität zu steuern. Sie können Daten neu partitionieren, bevor Sie diese schreiben, um die Parallelität zu steuern. Vermeiden Sie eine hohe Anzahl von Partitionen auf großen Clustern, um Ihre Remotedatenbank nicht zu überlasten. Im folgenden Beispiel wird die Neupartitionierung auf acht Partitionen vor dem Schreiben veranschaulicht:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Weitergeben einer Abfrage an die Datenbank-Engine

Sie können eine gesamte Abfrage in die Datenbank drücken und nur das Ergebnis zurückgeben. Der table Parameter identifiziert die JDBC-Tabelle, die gelesen werden soll. Sie können alles verwenden, was in einer SQL-Abfrageklausel FROM gültig ist.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Kontrolle über die Anzahl der pro Abfrage abgerufenen Zeilen

JDBC-Treiber verfügen über einen fetchSize Parameter, der die Anzahl der Zeilen steuert, die gleichzeitig aus der Remotedatenbank abgerufen werden.

Konfiguration Ergebnis
Zu niedrig Hohe Latenz aufgrund vieler Rundenfahrten (wenige Zeilen werden pro Abfrage zurückgegeben)
Zu hoch Speicherfehler (zu viele Daten in einer Abfrage)

Der optimale Wert ist arbeitsabhängig. Zu den Überlegungen gehören:

  • Wie viele Spalten werden von der Abfrage zurückgegeben?
  • Welche Datentypen werden zurückgegeben?
  • Wie lang sind die Zeichenfolgen in jeder zurückgegebenen Spalte?

Systeme haben möglicherweise sehr kleine Standardeinstellungen und profitieren von der Optimierung. Beispiel: Oracles Standard fetchSize ist 10. Das Erhöhen auf 100 reduziert die Anzahl der Gesamtabfragen, die mit einem Faktor von 10 ausgeführt werden müssen. JDBC-Ergebnisse sind Netzwerkdatenverkehr, vermeiden Sie also sehr große Zahlen, aber optimale Werte könnten bei vielen Datensätzen im Tausend-Bereich liegen.

Verwenden Sie die fetchSize Option wie im folgenden Beispiel:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()