Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Azure Databricks admite la conexión a bases de datos externas mediante JDBC. En este artículo se proporciona la sintaxis básica para configurar y usar estas conexiones con ejemplos en Python, SQL y Scala.
Importante
La documentación de federación de consultas heredada ha quedado obsoleta y ya no se actualizará. Las configuraciones mencionadas en este contenido no están aprobadas o probadas oficialmente por Databricks. Si Lakehouse Federation admite la base de datos de origen, Databricks recomienda usarlo en su lugar.
Partner Connect proporciona integraciones optimizadas para sincronizar datos con muchos orígenes de datos externos externos. Consulte ¿Qué es Databricks Partner Connect?
Importante
Los ejemplos de este artículo no incluyen nombres de usuario y contraseñas en direcciones URL de JDBC. Databricks recomienda usar secretos para almacenar las credenciales de la base de datos. Por ejemplo:
Pitón
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Para hacer referencia a secretos de Databricks con SQL, debe configurar una propiedad de configuración de Spark durante la initilización del clúster.
Para obtener un ejemplo completo de administración de secretos, consulte Tutorial: Creación y uso de un secreto de Databricks.
Leer datos con JDBC
Debe configurar una serie de opciones para leer datos mediante JDBC. Tenga en cuenta que cada base de datos usa un formato diferente para .<jdbc-url>
Pitón
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark lee automáticamente el esquema de la tabla de base de datos y asigna sus tipos a los tipos de Spark SQL.
Pitón
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Puede ejecutar consultas en esta tabla JDBC:
Pitón
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Escritura de datos con JDBC
Guardar datos en tablas con JDBC usa configuraciones similares para leer. Vea el ejemplo siguiente:
Pitón
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
El comportamiento predeterminado intenta crear una nueva tabla y produce un error si ya existe una tabla con ese nombre.
Puede anexar datos a una tabla existente mediante la sintaxis siguiente:
Pitón
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Puede sobrescribir una tabla existente mediante la sintaxis siguiente:
Pitón
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Control del paralelismo para consultas JDBC
De forma predeterminada, el controlador JDBC consulta la base de datos de origen con solo un único subproceso. Para mejorar el rendimiento de las lecturas, debe especificar una serie de opciones para controlar cuántas consultas simultáneas realiza Azure Databricks en la base de datos. Para clústeres pequeños, establecer la numPartitions
opción igual al número de núcleos del ejecutor en el clúster garantiza que todos los nodos consultan datos en paralelo.
Advertencia
Establecer numPartitions
en un valor alto en un clúster grande puede dar lugar a un rendimiento negativo para la base de datos remota, ya que demasiadas consultas simultáneas pueden sobrecargar el servicio. Esto es especialmente problemático para las bases de datos de aplicaciones. Tenga cuidado de establecer este valor por encima de 50.
Nota:
Acelere las consultas seleccionando una columna con un índice calculado en la base de datos de origen para partitionColumn
.
En el ejemplo de código siguiente se muestra cómo configurar paralelismo para un clúster con ocho núcleos:
Pitón
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Nota:
Azure Databricks admite todas las opciones de Apache Spark para configurar JDBC.
Al escribir en bases de datos mediante JDBC, Apache Spark usa el número de particiones en memoria para controlar el paralelismo. Puede reparticionar los datos antes de escribir para controlar el paralelismo. Evite un gran número de particiones en clústeres grandes para evitar sobrecargar la base de datos remota. El siguiente ejemplo demuestra cómo reparticionar en ocho particiones antes de escribir.
Pitón
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Transmisión de una consulta al motor de base de datos
Puede insertar una consulta completa en la base de datos y devolver solo el resultado. El table
parámetro identifica la tabla JDBC que se va a leer. Puede usar todo lo que sea válido en una cláusula de consulta FROM
SQL.
Pitón
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Controlar el número de filas obtenidas por consulta
Los controladores JDBC tienen un fetchSize
parámetro que controla el número de filas capturadas a la vez desde la base de datos remota.
Configuración | Resultado |
---|---|
Demasiado bajo | Latencia alta debido a muchos recorridos de ida y vuelta (pocas filas devueltas por consulta) |
Demasiado alto | Error de memoria insuficiente (demasiados datos devueltos en una consulta) |
El valor óptimo depende de la carga de trabajo. Entre las consideraciones se incluyen las siguientes:
- ¿Cuántas columnas devuelve la consulta?
- ¿Qué tipos de datos se devuelven?
- ¿Cuánto tiempo tienen las cadenas de texto devueltas por cada columna?
Los sistemas pueden tener un valor predeterminado muy pequeño y beneficiarse del ajuste. Por ejemplo: el valor predeterminado fetchSize
de Oracle es 10. Aumentarlo a 100 reduce el número de consultas totales que deben ejecutarse en un factor de 10. Los resultados de JDBC son el tráfico de red, por lo que evite números muy grandes, pero los valores óptimos podrían estar en los miles de conjuntos de datos.
Use la fetchSize
opción , como en el ejemplo siguiente:
Pitón
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()