Compartilhar via


Consultar bancos de dados usando o JDBC

O Azure Databricks dá suporte à conexão a bancos de dados externos usando o JDBC. Este artigo fornece a sintaxe básica para configuração e uso dessas conexões com exemplos no Python, SQL e Scala.

Importante

As configurações descritas nesse artigo são Experimentais. Recursos experimentais são fornecidos no estado em que se encontram e não têm suporte do Databricks por meio do suporte técnico para clientes. Para obter suporte completo à federação de consultas, você deve usar a Federação do Lakehouse, que permite que os usuários do Azure Databricks aproveitem a sintaxe do Catálogo do Unity e as ferramentas de governança de dados.

O Partner Connect fornece integrações otimizadas para sincronizar dados com muitas fontes de dados externas. Consulte O que é o Databricks Partner Connect?.

Importante

Os exemplos deste artigo não incluem nomes de arquivo nem senhas em URLs do JDBC. O Databricks recomenda o uso de segredos para armazenar suas credenciais de banco de dados. Por exemplo:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Para fazer referência aos segredos do Databricks com o SQL, você deve configurar uma propriedade de configuração do Spark durante a inicialização do cluster.

Para obter um exemplo completo de gerenciamento de segredos, confira Exemplo de fluxo de trabalho de segredos.

Leitura de dados com o JDBC

Você deve definir várias configurações para ler dados usando JDBC. Observe que cada banco de dados usa um formato diferente para <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

O Spark lê automaticamente o esquema da tabela de banco de dados e mapeia os tipos novamente para os tipos Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Você pode executar consultas nessa tabela JDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Gravação de dados com o JDBC

Ao salvar dados em tabelas com o JDBC, configurações semelhantes são usadas para leitura. Consulte o seguinte exemplo:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

O comportamento padrão tenta criar uma tabela e gera um erro se já existir uma tabela com o mesmo nome.

Você pode acrescentar dados a uma tabela existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Você pode substituir uma tabela existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Controlar o paralelismo nas consultas JDBC

Por padrão, o driver JDBC consulta o banco de dados de origem com apenas um único thread. Para melhorar o desempenho das leituras, você precisa especificar várias opções para controlar quantas consultas simultâneas o Azure Databricks faz ao banco de dados. Para clusters pequenos, a definição da opção numPartitions igual ao número de núcleos executores em seu cluster garante que todos os nós consultem dados em paralelo.

Aviso

A configuração de numPartitions como um valor alto em um cluster grande pode resultar em desempenho negativo para o banco de dados remoto, pois muitas consultas simultâneas podem sobrecarregar o serviço. Isso é especialmente problemático nos bancos de dados de aplicativos. Tenha cuidado ao definir esse valor acima de 50.

Observação

Acelere as consultas selecionando uma coluna com um índice calculado no banco de dados de origem para partitionColumn.

O exemplo de código a seguir demonstra a configuração de paralelismo para um cluster com oito núcleos:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Observação

O Azure Databricks dá suporte a todas as opções do Apache Spark para configurar o JDBC.

Ao gravar em bancos de dados usando o JDBC, o Apache Spark usa o número de partições na memória para controlar o paralelismo. Você pode reparticionar dados antes de gravar para controlar o paralelismo. Evite um alto número de partições em clusters grandes para evitar sobrecarregar o banco de dados remoto. O exemplo a seguir demonstra o reparticionamento para oito partições antes da gravação:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Efetuar pushdown de uma consulta por push para o mecanismo de banco de dados

Você pode efetuar pushdown de uma consulta inteira para o banco de dados e retornar apenas o resultado. O parâmetro table identifica a tabela JDBC a ser lida. Você pode usar qualquer coisa que seja válida em uma cláusula FROM de consulta SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Número de controle de linhas buscadas por consulta

Os drivers JDBC têm um parâmetro fetchSize que controla o número de linhas buscadas por vez do banco de dados remoto.

Configuração Result
Muito baixo Alta latência devido a muitas viagens de ida e volta (poucas linhas retornadas por consulta)
Muito alto Erro de memória insuficiente (muitos dados retornados em uma consulta)

O valor ideal depende da carga de trabalho. Entre as considerações estão:

  • Quantas colunas são retornadas pela consulta?
  • Quais tipos de dados são retornados?
  • Por quanto tempo as cadeias de caracteres em cada coluna são retornadas?

Os sistemas podem ter um padrão muito pequeno e se beneficiar do ajuste. Por exemplo: o padrão fetchSize do Oracle é 10. Aumentar para 100 reduz o número de consultas totais que precisam ser executadas por um fator de 10. Os resultados JDBC são tráfego, portanto, evite números muito grandes, mas os valores ideais podem ser milhares para muitos conjuntos de dados.

Use a opção fetchSize, como no exemplo a seguir:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()