Azure Databricks では、JDBC を使用した外部データベースへの接続がサポートされています。 この記事では、Python、SQL、Scala の例でこれらの接続を構成および使用するための基本的な構文について説明します。
Von Bedeutung
従来のクエリフェデレーションドキュメントは廃止され、更新されない可能性があります。 このコンテンツに記載されている構成は、Databricks によって公式に承認またはテストされていません。 Lakehouse Federation がソース データベースをサポートしている場合、Databricks では代わりにこれを使用することをお勧めします。
Partner Connect は、多くの外部データ ソースとデータを同期するために最適化された統合を提供します。 「Databricks Partner Connect とは」を参照してください。
Von Bedeutung
この記事の例では、JDBC URL にユーザー名とパスワードは含まれません。 Databricks では、 シークレット を使用してデータベース資格情報を格納することをお勧めします。 例えば次が挙げられます。
Python(プログラミング言語)
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
スカラ (プログラミング言語)
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
SQL で Databricks シークレットを参照するには、クラスターの 初期化中に Spark 構成プロパティを構成する必要があります。
シークレット管理の完全な例については、「 チュートリアル: Databricks シークレットを作成して使用する」を参照してください。
JDBC を使用してデータを読み取る
JDBC を使用してデータを読み取るために、いくつかの設定を構成する必要があります。 各データベースでは、 <jdbc-url>
に異なる形式が使用されることに注意してください。
Python(プログラミング言語)
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
スカラ (プログラミング言語)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark は、データベース テーブルからスキーマを自動的に読み取り、その型を Spark SQL 型にマップします。
Python(プログラミング言語)
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
スカラ (プログラミング言語)
employees_table.printSchema
この JDBC テーブルに対してクエリを実行できます。
Python(プログラミング言語)
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
スカラ (プログラミング言語)
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
JDBC を使用してデータを書き込む
JDBC を使用してテーブルにデータを保存すると、読み取りと同様の構成が使用されます。 次の例を参照してください。
Python(プログラミング言語)
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
スカラ (プログラミング言語)
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
既定の動作では、新しいテーブルの作成が試みられ、その名前のテーブルが既に存在する場合はエラーがスローされます。
次の構文を使用して、既存のテーブルにデータを追加できます。
Python(プログラミング言語)
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
スカラ (プログラミング言語)
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
次の構文を使用して、既存のテーブルを上書きできます。
Python(プログラミング言語)
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
スカラ (プログラミング言語)
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
JDBC クエリの並列処理を制御する
既定では、JDBC ドライバーは 1 つのスレッドのみでソース データベースに対してクエリを実行します。 読み取りのパフォーマンスを向上させるには、Azure Databricks がデータベースに対して同時に実行するクエリの数を制御するためのオプションを多数指定する必要があります。 小規模クラスターの場合、 numPartitions
オプションをクラスター内の Executor コアの数と同じに設定すると、すべてのノードがデータのクエリを並列で実行できます。
Warnung
大規模なクラスターで numPartitions
を高い値に設定すると、同時クエリが多すぎるとサービスに負荷がかかる可能性があるため、リモート データベースのパフォーマンスが低下する可能性があります。 これは、アプリケーション データベースでは特に面倒です。 この値を 50 より上に設定する場合は注意してください。
注
partitionColumn
のソース データベースで計算されたインデックスを持つ列を選択して、クエリを高速化します。
次のコード例は、8 つのコアを持つクラスターの並列処理を構成する方法を示しています。
Python(プログラミング言語)
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
スカラ (プログラミング言語)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
注
Azure Databricks では、 JDBC を構成するためのすべての Apache Spark オプションがサポートされています。
JDBC を使用してデータベースに書き込む場合、Apache Spark はメモリ内のパーティションの数を使用して並列処理を制御します。 書き込み前にデータを再パーティションして並列処理を制御できます。 リモート データベースが過負荷にならないように、大規模なクラスターでは多数のパーティションを使用しないようにします。 次の例では、書き込み前に 8 つのパーティションに再分割する手順を示します。
Python(プログラミング言語)
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
スカラ (プログラミング言語)
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
クエリをデータベース エンジンにプッシュダウンする
クエリ全体をデータベースにプッシュダウンし、結果のみを返すことができます。
table
パラメーターは、読み取る JDBC テーブルを識別します。 SQL クエリ FROM
句で有効なものは何でも使用できます。
Python(プログラミング言語)
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
スカラ (プログラミング言語)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
クエリごとにフェッチされる行数を制御する
JDBC ドライバーには、リモート データベースから一度にフェッチされる行数を制御する fetchSize
パラメーターがあります。
設定 | 結果 |
---|---|
低すぎる | ラウンドトリップが多いため、待機時間が長くなります (クエリごとに返される行は少ない) |
高すぎる | メモリ不足エラー (1 つのクエリで返されるデータが多すぎます) |
最適な値はワークロードに依存します。 考慮事項は次のとおりです。
- クエリによって返される列の数はいくつですか?
- 返されるデータ型
- 各列の文字列はどのくらいの期間返されますか?
システムはデフォルト値が非常に小さいため、チューニングによって改善が期待できます。 たとえば、Oracle の既定の fetchSize
は 10 です。 これを 100 に増やすと、実行する必要があるクエリの合計数が 10 倍減ります。 JDBC の結果はネットワーク トラフィックであるため、非常に大きな数を避けますが、多くのデータセットでは最適な値が数千に含まれる可能性があります。
次の例のように、 fetchSize
オプションを使用します。
Python(プログラミング言語)
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
スカラ (プログラミング言語)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()