次の方法で共有


JDBC を使用したデータベースのクエリ

Azure Databricks では、JDBC を使用した外部データベースへの接続がサポートされています。 この記事では、Python、SQL、Scala の例でこれらの接続を構成および使用するための基本的な構文について説明します。

Von Bedeutung

従来のクエリフェデレーションドキュメントは廃止され、更新されない可能性があります。 このコンテンツに記載されている構成は、Databricks によって公式に承認またはテストされていません。 Lakehouse Federation がソース データベースをサポートしている場合、Databricks では代わりにこれを使用することをお勧めします。

Partner Connect は、多くの外部データ ソースとデータを同期するために最適化された統合を提供します。 「Databricks Partner Connect とは」を参照してください。

Von Bedeutung

この記事の例では、JDBC URL にユーザー名とパスワードは含まれません。 Databricks では、 シークレット を使用してデータベース資格情報を格納することをお勧めします。 例えば次が挙げられます。

Python(プログラミング言語)

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

スカラ (プログラミング言語)

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

SQL で Databricks シークレットを参照するには、クラスターの 初期化中に Spark 構成プロパティを構成する必要があります。

シークレット管理の完全な例については、「 チュートリアル: Databricks シークレットを作成して使用する」を参照してください。

JDBC を使用してデータを読み取る

JDBC を使用してデータを読み取るために、いくつかの設定を構成する必要があります。 各データベースでは、 <jdbc-url>に異なる形式が使用されることに注意してください。

Python(プログラミング言語)

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

スカラ (プログラミング言語)

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark は、データベース テーブルからスキーマを自動的に読み取り、その型を Spark SQL 型にマップします。

Python(プログラミング言語)

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

スカラ (プログラミング言語)

employees_table.printSchema

この JDBC テーブルに対してクエリを実行できます。

Python(プログラミング言語)

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

スカラ (プログラミング言語)

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

JDBC を使用してデータを書き込む

JDBC を使用してテーブルにデータを保存すると、読み取りと同様の構成が使用されます。 次の例を参照してください。

Python(プログラミング言語)

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

スカラ (プログラミング言語)

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

既定の動作では、新しいテーブルの作成が試みられ、その名前のテーブルが既に存在する場合はエラーがスローされます。

次の構文を使用して、既存のテーブルにデータを追加できます。

Python(プログラミング言語)

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

スカラ (プログラミング言語)

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

次の構文を使用して、既存のテーブルを上書きできます。

Python(プログラミング言語)

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

スカラ (プログラミング言語)

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

JDBC クエリの並列処理を制御する

既定では、JDBC ドライバーは 1 つのスレッドのみでソース データベースに対してクエリを実行します。 読み取りのパフォーマンスを向上させるには、Azure Databricks がデータベースに対して同時に実行するクエリの数を制御するためのオプションを多数指定する必要があります。 小規模クラスターの場合、 numPartitions オプションをクラスター内の Executor コアの数と同じに設定すると、すべてのノードがデータのクエリを並列で実行できます。

Warnung

大規模なクラスターで numPartitions を高い値に設定すると、同時クエリが多すぎるとサービスに負荷がかかる可能性があるため、リモート データベースのパフォーマンスが低下する可能性があります。 これは、アプリケーション データベースでは特に面倒です。 この値を 50 より上に設定する場合は注意してください。

partitionColumnのソース データベースで計算されたインデックスを持つ列を選択して、クエリを高速化します。

次のコード例は、8 つのコアを持つクラスターの並列処理を構成する方法を示しています。

Python(プログラミング言語)

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

スカラ (プログラミング言語)

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Azure Databricks では、 JDBC を構成するためのすべての Apache Spark オプションがサポートされています。

JDBC を使用してデータベースに書き込む場合、Apache Spark はメモリ内のパーティションの数を使用して並列処理を制御します。 書き込み前にデータを再パーティションして並列処理を制御できます。 リモート データベースが過負荷にならないように、大規模なクラスターでは多数のパーティションを使用しないようにします。 次の例では、書き込み前に 8 つのパーティションに再分割する手順を示します。

Python(プログラミング言語)

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

スカラ (プログラミング言語)

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

クエリをデータベース エンジンにプッシュダウンする

クエリ全体をデータベースにプッシュダウンし、結果のみを返すことができます。 table パラメーターは、読み取る JDBC テーブルを識別します。 SQL クエリ FROM 句で有効なものは何でも使用できます。

Python(プログラミング言語)

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

スカラ (プログラミング言語)

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

クエリごとにフェッチされる行数を制御する

JDBC ドライバーには、リモート データベースから一度にフェッチされる行数を制御する fetchSize パラメーターがあります。

設定 結果
低すぎる ラウンドトリップが多いため、待機時間が長くなります (クエリごとに返される行は少ない)
高すぎる メモリ不足エラー (1 つのクエリで返されるデータが多すぎます)

最適な値はワークロードに依存します。 考慮事項は次のとおりです。

  • クエリによって返される列の数はいくつですか?
  • 返されるデータ型
  • 各列の文字列はどのくらいの期間返されますか?

システムはデフォルト値が非常に小さいため、チューニングによって改善が期待できます。 たとえば、Oracle の既定の fetchSize は 10 です。 これを 100 に増やすと、実行する必要があるクエリの合計数が 10 倍減ります。 JDBC の結果はネットワーク トラフィックであるため、非常に大きな数を避けますが、多くのデータセットでは最適な値が数千に含まれる可能性があります。

次の例のように、 fetchSize オプションを使用します。

Python(プログラミング言語)

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

スカラ (プログラミング言語)

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()