Interroger des bases de données en utilisant JDBC
Azure Databricks prend en charge la connexion à des bases de données externes en utilisant JDBC. Cet article fournit la syntaxe de base pour la configuration et l’utilisation de ces connexions avec des exemples dans Python, SQL et Scala.
Important
Les configurations décrites dans cet article sont Expérimentales. Les fonctionnalités expérimentales sont fournies en l’état et ne sont pas prises en charge par Databricks via le support technique client. Pour bénéficier d’une prise en charge complète de la fédération de requêtes, vous devez plutôt utiliser Lakehouse Federation, qui permet à vos utilisateurs Azure Databricks de profiter de la syntaxe Unity Catalog et des outils de gouvernance des données.
Partner Connect fournit des intégrations optimisées pour la synchronisation des données avec de nombreuses sources de données externes. Voir Qu’est-ce que Databricks Partner Connect ?.
Important
Les exemples de cet article n’incluent pas les noms d’utilisateur et les mots de passe dans les URL JDBC. Databricks recommande d’utiliser des secrets pour stocker vos informations d’identification de base de données. Par exemple :
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Pour référencer des secrets Databricks avec SQL, vous devez configurer une propriété de configuration Spark lors de l’initialisation du cluster.
Pour obtenir un exemple complet de la gestion des secrets, consultez Exemple de workflow des secrets.
Lecture des données avec JDBC
Vous devez configurer un certain nombre de paramètres pour lire des données à l’aide de JDBC. Notez que chaque base de données utilise un format différent pour le <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark lit automatiquement le schéma de la table de base de données et, en retour, il mappe ses types à des types Spark SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Vous pouvez exécuter des requêtes sur cette table JDBC :
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Écriture des données avec JDBC
L’enregistrement de données dans des tableaux avec JDBC utilise des configurations similaires à la lecture. Voir l’exemple suivant :
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Le comportement par défaut consiste à créer un nouveau tableau et à générer un message d’erreur si un tableau du même nom existe déjà.
Vous pouvez ajouter des données à un tableau existant à l’aide de la syntaxe suivante :
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Vous pouvez remplacer un tableau existant en utilisant la syntaxe suivante :
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Parallélisme de contrôle pour les requêtes JDBC
Par défaut, le pilote JDBC interroge la base de données source avec un seul thread. Pour améliorer les performances des lectures, vous devez spécifier un certain nombre d’options pour contrôler le nombre de requêtes simultanées effectuées par Azure Databricks dans votre base de données. Pour les petits clusters, paramétrer que l’option numPartitions
égale au nombre de cœurs d’exécuteur de votre cluster garantit que tous les nœuds interrogent des données en parallèle.
Avertissement
Paramétrer numPartitions
à une valeur élevée sur un cluster volumineux peut entraîner des performances négatives pour la base de données distante, car trop de requêtes simultanées peuvent surcharger le service. Cela est particulièrement problématique pour les bases de données d’application. Soyez prudent de paramétrer cette valeur au delà de 50.
Notes
Accélérez les requêtes en sélectionnant une colonne avec un index calculé dans la base de données source pour partitionColumn
.
L’exemple de code suivant illustre la configuration du parallélisme pour un cluster avec huit cœurs :
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Notes
Azure Databricks prend en charge toutes les options Apache Spark pour la configuration de JDBC.
Lors de l’écriture dans des bases de données à l’aide de JDBC, Apache Spark utilise le nombre de partitions en mémoire pour contrôler le parallélisme. Vous pouvez répartir les données avant d’écrire pour contrôler le parallélisme. Évitez un nombre élevé de partitions sur des clusters volumineux pour éviter d’surcharger votre base de données distante. L’exemple suivant illustre le repartitionnement sur huit partitions avant d’écrire :
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Pousser une requête vers le moteur de base de données
Vous pouvez pousser une requête entière vers la base de données et retourner uniquement le résultat. Le paramètre table
identifie la table JDBC à lire. Vous pouvez utiliser tout élément valide dans la clause FROM
d’une requête SQL.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Nombre de lignes extraites par requête
Les pilotes JDBC ont un paramètre fetchSize
qui contrôle le nombre de lignes pouvant être récupérées simultanément de la base de données distante.
Paramètre | Résultats |
---|---|
Trop faible | Latence élevée due à de nombreux allers-retours (quelques lignes renvoyées par requête) |
Trop élevé | Erreur de mémoire insuffisante (trop de données renvoyées dans une requête) |
La valeur optimale dépend de la charge de travail. Éléments à prendre en compte :
- Combien de colonnes sont renvoyées par la requête ?
- Quels types de données sont retournés ?
- Combien de temps les chaînes de chaque colonne sont-elles renvoyées ?
Les systèmes peuvent avoir un tout petit défaut et bénéficier du réglage. Par exemple : La valeur par défaut fetchSize
d’Oracle est 10. L’augmentation à 100 réduit le nombre total de requêtes qui doivent être exécutées par un facteur de 10. Les résultats JDBC sont le trafic réseau, donc évitez de très grands nombres, mais les valeurs optimales peuvent atteindre les milliers pour de nombreux jeux de données.
Utilisez l’option fetchSize
, comme dans l’exemple suivant :
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()