共用方式為


使用 JDBC 查詢資料庫

Azure Databricks 支援使用 JDBC 連線到外部資料庫。 本文提供基本語法,以搭配 Python、SQL 和 Scala 中的範例來設定和使用這些連線。

重要

本文所述的設定為 實驗性。 實驗性功能是以目前方式提供,且透過客戶支援不支援 Databricks。 若要取得完整的查詢同盟支援,您應該改用 Lakehouse 同盟,這可讓您的 Azure Databricks 使用者利用 Unity 目錄語法和數據控管工具。

Partner Connect 提供優化整合,可將數據與許多外部數據源同步處理。 請參閱 什麼是 Databricks Partner Connect?

重要

本文中的範例不包含 JDBC URL 中的使用者名稱和密碼。 Databricks 建議使用 秘密 來儲存資料庫認證。 例如:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

若要使用 SQL 參考 Databricks 秘密,您必須 在叢集初始化期間設定 Spark 組態屬性。

如需秘密管理的完整範例,請參閱 秘密工作流程範例

使用 JDBC 讀取數據

您必須設定一些設定,以使用 JDBC 讀取數據。 請注意,每個資料庫都會針對 <jdbc-url>使用不同的格式。

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark 會自動從資料庫數據表讀取架構,並將其類型對應回 Spark SQL 類型。

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

您可以針對此 JDBC 資料表執行查詢:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

使用 JDBC 寫入數據

使用 JDBC 將資料儲存至資料表時,會使用類似的設定來讀取。 請參閱下列範例:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

默認行為會嘗試建立新的數據表,並在具有該名稱的數據表已經存在時擲回錯誤。

您可以使用下列語法將資料附加至現有的資料表:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

您可以使用下列語法覆寫現有的資料表:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

控制 JDBC 查詢的平行處理原則

根據預設,JDBC 驅動程式只會使用單一線程查詢源資料庫。 若要改善讀取的效能,您必須指定一些選項,以控制 Azure Databricks 對資料庫進行多少同時查詢。 針對小型叢集,將 選項設定 numPartitions 為等於叢集中的執行程式核心數目,可確保所有節點都會平行查詢數據。

警告

將 設定 numPartitions 為大型叢集上的高值可能會導致遠端資料庫的負面效能,因為太多同時查詢可能會使服務不堪重負。 對於應用程式資料庫來說,這特別麻煩。 請謹慎設定此值高於 50。

注意

選取在源資料庫中 partitionColumn計算之索引的數據行,以加速查詢。

下列程式代碼範例示範如何為具有八個核心的叢集設定平行處理原則:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

注意

Azure Databricks 支援所有 Apache Spark 選項來設定 JDBC

使用 JDBC 寫入資料庫時,Apache Spark 會使用記憶體中的數據分割數目來控制平行處理原則。 您可以先重新分割數據,再寫入以控制平行處理原則。 避免大型叢集上的大量分割區,以避免讓遠端資料庫壓倒性。 下列範例示範在寫入之前重新分割至八個分割區:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

將查詢向下推送至資料庫引擎

您可以將整個查詢向下推送至資料庫,並只傳回結果。 參數 table 會識別要讀取的 JDBC 資料表。 您可以使用任何在 SQL 查詢 FROM 子句中有效的專案。

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

控制每個查詢擷取的數據列數目

JDBC 驅動程式具有 fetchSize 參數,可控制一次從遠端資料庫擷取的數據列數目。

設定 結果
太低 由於往返次數過多而造成高延遲(每個查詢傳回的數據列很少)
太高 記憶體不足錯誤(在一個查詢中傳回太多資料)

最佳值取決於工作負載。 考量項目包括:

  • 查詢會傳回多少個數據行?
  • 傳回哪些數據類型?
  • 每個數據行中的字串傳回多久時間?

系統可能會有非常小的預設值,並受益於微調。 例如:Oracle 的預設值 fetchSize 為 10。 將它增加至 100,可減少需要以 10 乘以 10 執行的總查詢數目。 JDBC 結果為網路流量,因此請避免非常大量,但最佳值可能位於數千個數據集中。

fetchSize使用 選項,如下列範例所示:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()