Abfragen von Datenbanken mit JDBC
Azure Databricks unterstützt die Verbindung zu externen Datenbanken mithilfe von JDBC. Dieser Artikel enthält die grundlegende Syntax zum Konfigurieren und Verwenden dieser Verbindungen mit Beispielen in Python, SQL und Scala.
Wichtig
Die in diesem Artikel beschriebenen Konfigurationen sind experimentell. Experimentelle Funktionen werden ohne Mängelgewähr („wie besehen“) bereitgestellt und nicht von Databricks über Kanäle des technischen Kundensupports unterstützt. Um vollständige Unterstützung für den Abfrageverbund zu erhalten, sollten Sie stattdessen Lakehouse-Verbund verwenden, wodurch Ihre Azure Databricks-Benutzer die Vorteile der Unity Catalog-Syntax und der Datengovernancetools nutzen können.
Partner Connect bietet optimierte Integrationen für die Synchronisierung von Daten mit vielen externen Datenquellen. Siehe Was ist Databricks Partner Connect?.
Wichtig
Die Beispiele in diesem Artikel enthalten keine Benutzernamen und Kennwörter in JDBC-URLs. Databricks empfiehlt die Verwendung von Geheimnissen zum Speichern Ihrer Datenbankanmeldeinformationen. Beispiele:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Um auf Databricks-Geheimnisse zu verweisen, müssen Sie während der Clusterinitilisierung eine Spark-Konfigurationseigenschaft konfigurieren.
Ein vollständiges Beispiel für die Geheimnisverwaltung finden Sie unter Beispiel für einen Geheimnisworkflow.
Lesen von Daten mit JDBC
Sie müssen eine Reihe von Einstellungen konfigurieren, um Daten mithilfe von JDBC lesen zu können. Beachten Sie, dass jede Datenbank ein anderes Format für die <jdbc-url>
verwendet.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark liest das Schema automatisch aus der Datenbanktabelle und ordnet dessen Typen den Spark SQL-Typen zu.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Sie können Abfragen für diese JDBC-Tabelle ausführen:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Schreiben von Daten mit JDBC
Beim Speichern von Daten in Tabellen mit JDBC werden ähnliche Konfigurationen wie beim Lesen verwendet. Siehe folgendes Beispiel:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Beim Standardverhalten wird versucht, eine neuen Tabelle zu erstellen, und es wird eine Fehlermeldung ausgegeben, wenn bereits eine Tabelle mit diesem Namen vorhanden ist.
Sie können Daten mithilfe der folgenden Syntax an eine vorhandene Tabelle anfügen:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Sie können eine vorhandene Tabelle mithilfe der folgenden Syntax überschreiben:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Steuern des Parallelismus für JDBC-Abfragen
Standardmäßig fragt der JDBC-Treiber die Quelldatenbank nur mit einem einzigen Thread ab. Um die Leistung für Lesevorgänge zu verbessern, müssen Sie eine Reihe von Optionen angeben, um zu steuern, wie viele gleichzeitige Abfragen Azure Databricks an Ihre Datenbank macht. Bei kleinen Clustern wird durch das Festlegen der numPartitions
-Option bei gleicher Anzahl von Executorkernen in Ihrem Cluster sichergestellt, dass alle Knoten Daten parallel abfragen.
Warnung
Die Einstellung von numPartitions
auf einen hohen Wert in einem großen Cluster kann zu einer negativen Leistung in der Remotedatenbank führen, da der Dienst durch zu viele gleichzeitige Abfragen überlastet werden könnte. Dies ist besonders problematisch bei Anwendungsdatenbanken. Legen Sie diese Einstellung besser nicht auf einen Wert über 50 fest.
Hinweis
Beschleunigen Sie Abfragen, indem Sie eine Spalte mit einem Index auswählen, der in der Quelldatenbank für die partitionColumn
berechnet wurde.
Im folgenden Codebeispiel wird die Konfiguration des Parallelismus für einen Cluster mit acht Kernen veranschaulicht:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Hinweis
Azure Databricks unterstützt alle Apache Spark -Optionen zum Konfigurieren von JDBC.
Beim Schreiben in Datenbanken mit JDBC verwendet Apache Spark die Anzahl der Partitionen im Arbeitsspeicher, um die Parallelität zu steuern. Sie können Daten neu partitionieren, bevor Sie zum Steuern der Parallelität schreiben. Vermeiden Sie eine hohe Anzahl von Partitionen auf großen Clustern, um zu vermeiden, dass Ihre Remotedatenbank überlastet wird. Im folgenden Beispiel wird die Neupartitionierung auf acht Partitionen vor dem Schreiben veranschaulicht:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Weitergeben einer Abfrage an die Datenbank-Engine
Sie können eine gesamte Abfrage an die Datenbank weitergeben und nur das Ergebnis zurückgeben. Der Parameter table
ermittelt die zu lesende JDBC-Tabelle. Jedes gültige Format der SQL-Abfrageklausel FROM
ist zulässig.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Steuern der Anzahl der pro Abfrage abgerufenen Zeilen
JDBC-Treiber verfügen über einen fetchSize
-Parameter, der die Anzahl der Zeilen steuert, die gleichzeitig aus der JDBC-Remotedatenbank abgerufen werden.
Einstellung | Ergebnis |
---|---|
Zu niedrig | Hohe Latenz aufgrund vieler Roundtrips (wenige Zeilen, die pro Abfrage zurückgegeben werden) |
Zu hoch | Speicherfehler (zu viele Daten, die in einer Abfrage zurückgegeben werden) |
Der optimale Wert ist abhängig von der Arbeitsauslastung. Diese Aspekte spielen eine Rolle:
- Wie viele Spalten werden von der Abfrage zurückgegeben?
- Welche Datentypen werden zurückgegeben?
- Wie lange werden die Zeichenfolgen in jeder Spalte zurückgegeben?
Systeme haben möglicherweise sehr geringe Standardeinstellungen und profitieren von der Optimierung. Beispiel: Die Standardeinstellung fetchSize
von Oracle ist 10. Durch das Erhöhen auf 100 wird die Anzahl der Gesamtabfragen reduziert, die mit dem Faktor 10 ausgeführt werden müssen. JDBC-Ergebnisse sind Netzwerkdatenverkehr; vermeiden Sie daher sehr große Zahlen, aber optimale Werte können bei vielen Datasets in den Tausenden liegen.
Verwenden Sie wie im folgenden Beispiel die Option fetchSize
:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()