Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Azure Databricks unterstützt die Verbindung zu externen Datenbanken mithilfe von JDBC. Dieser Artikel enthält die grundlegende Syntax zum Konfigurieren und Verwenden dieser Verbindungen mit Beispielen in Python, SQL und Scala.
Wichtig
Die Legacy-Abfrageverbunddokumentation wurde eingestellt und kann nicht aktualisiert werden. Die in diesem Inhalt genannten Konfigurationen werden nicht offiziell von Databricks unterstützt oder getestet. Wenn Lakehouse Federation Ihre Quelldatenbank unterstützt, empfiehlt Databricks stattdessen die Verwendung.
Partner Connect bietet optimierte Integrationen zum Synchronisieren von Daten mit vielen externen Datenquellen. Siehe Was ist Databricks Partner Connect?.
Wichtig
Die Beispiele in diesem Artikel enthalten keine Benutzernamen und Kennwörter in JDBC-URLs. Databricks empfiehlt die Verwendung geheimer Schlüssel zum Speichern Ihrer Datenbankanmeldeinformationen. Beispiel:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Um auf Databricks-Schlüssel mit SQL zu verweisen, müssen Sie während der Clusterinitilisierung eine Spark-Konfigurationseigenschaft konfigurieren.
Ein vollständiges Beispiel für die geheime Verwaltung finden Sie im Lernprogramm: Erstellen und Verwenden eines Geheimschlüssels für Databricks.
Lesen von Daten mit JDBC
Sie müssen eine Reihe von Einstellungen konfigurieren, um Daten mithilfe von JDBC lesen zu können. Beachten Sie, dass jede Datenbank ein anderes Format für die <jdbc-url>
verwendet.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark liest das Schema automatisch aus der Datenbanktabelle und ordnet die zugehörigen Typen zurück zu Spark SQL-Typen zu.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Sie können Abfragen gegen diese JDBC-Tabelle ausführen.
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Schreiben von Daten mit JDBC
Das Speichern von Daten in Tabellen mit JDBC verwendet ähnliche Konfigurationen wie beim Lesen. Sehen Sie sich das folgende Beispiel an:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Das Standardverhalten versucht, eine neue Tabelle zu erstellen und löst einen Fehler aus, wenn bereits eine Tabelle mit diesem Namen vorhanden ist.
Sie können Daten mithilfe der folgenden Syntax an eine vorhandene Tabelle anfügen:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Sie können eine vorhandene Tabelle mit der folgenden Syntax überschreiben:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Steuern der Parallelität für JDBC-Abfragen
Standardmäßig fragt der JDBC-Treiber die Quelldatenbank nur mit einem einzigen Thread ab. Um die Leistung für Lesevorgänge zu verbessern, müssen Sie eine Reihe von Optionen angeben, um zu steuern, wie viele gleichzeitige Abfragen Azure Databricks an Ihrer Datenbank durchführen. Bei kleinen Clustern stellt das Festlegen der numPartitions
Option gleich der Anzahl der Executorkerne in Ihrem Cluster sicher, dass alle Knoten Daten parallel abfragen.
Warnung
Das Festlegen von numPartitions
auf einen hohen Wert für einen großen Cluster kann zu einer schlechten Leistung der Remotedatenbank führen, da zu viele gleichzeitige Abfragen den Dienst überlasten könnten. Dies ist besonders problematisch für Anwendungsdatenbanken. Seien Sie vorsichtig, diesen Wert über 50 festzulegen.
Hinweis
Beschleunigen Sie Abfragen, indem Sie eine Spalte auswählen, die einen Index hat, der in der Quelldatenbank für die partitionColumn
berechnet wurde.
Im folgenden Codebeispiel wird die Konfiguration der Parallelität für einen Cluster mit acht Kernen veranschaulicht:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Hinweis
Azure Databricks unterstützt alle Apache Spark-Optionen zum Konfigurieren von JDBC.
Beim Schreiben in Datenbanken mithilfe von JDBC verwendet Apache Spark die Anzahl der Partitionen im Speicher, um die Parallelität zu steuern. Sie können Daten neu partitionieren, bevor Sie diese schreiben, um die Parallelität zu steuern. Vermeiden Sie eine hohe Anzahl von Partitionen auf großen Clustern, um Ihre Remotedatenbank nicht zu überlasten. Im folgenden Beispiel wird die Neupartitionierung auf acht Partitionen vor dem Schreiben veranschaulicht:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Weitergeben einer Abfrage an die Datenbank-Engine
Sie können eine gesamte Abfrage in die Datenbank drücken und nur das Ergebnis zurückgeben. Der table
Parameter identifiziert die JDBC-Tabelle, die gelesen werden soll. Sie können alles verwenden, was in einer SQL-Abfrageklausel FROM
gültig ist.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Kontrolle über die Anzahl der pro Abfrage abgerufenen Zeilen
JDBC-Treiber verfügen über einen fetchSize
Parameter, der die Anzahl der Zeilen steuert, die gleichzeitig aus der Remotedatenbank abgerufen werden.
Konfiguration | Ergebnis |
---|---|
Zu niedrig | Hohe Latenz aufgrund vieler Rundenfahrten (wenige Zeilen werden pro Abfrage zurückgegeben) |
Zu hoch | Speicherfehler (zu viele Daten in einer Abfrage) |
Der optimale Wert ist arbeitsabhängig. Zu den Überlegungen gehören:
- Wie viele Spalten werden von der Abfrage zurückgegeben?
- Welche Datentypen werden zurückgegeben?
- Wie lang sind die Zeichenfolgen in jeder zurückgegebenen Spalte?
Systeme haben möglicherweise sehr kleine Standardeinstellungen und profitieren von der Optimierung. Beispiel: Oracles Standard fetchSize
ist 10. Das Erhöhen auf 100 reduziert die Anzahl der Gesamtabfragen, die mit einem Faktor von 10 ausgeführt werden müssen. JDBC-Ergebnisse sind Netzwerkdatenverkehr, vermeiden Sie also sehr große Zahlen, aber optimale Werte könnten bei vielen Datensätzen im Tausend-Bereich liegen.
Verwenden Sie die fetchSize
Option wie im folgenden Beispiel:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()