Mengkueri database menggunakan JDBC

Azure Databricks mendukung menyambungkan ke database eksternal menggunakan JDBC. Artikel ini menyediakan sintaks dasar untuk mengonfigurasi dan menggunakan koneksi ini dengan contoh di Python, SQL, dan Scala.

Catatan

Anda mungkin lebih suka Federasi Lakehouse untuk mengelola kueri ke sistem database eksternal. Lihat Apa itu Federasi Lakehouse.

Mitra Koneksi menyediakan integrasi yang dioptimalkan untuk menyinkronkan data dengan banyak sumber data eksternal. Lihat Apa itu Databricks Partner Connect?.

Penting

Contoh dalam artikel ini tidak menyertakan nama pengguna dan kata sandi di URL JDBC. Databricks merekomendasikan penggunaan rahasia untuk menyimpan kredensial database Anda. Misalnya:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Untuk mereferensikan rahasia Databricks dengan SQL, Anda harus mengonfigurasi properti konfigurasi Spark selama initilisasi kluster.

Untuk contoh lengkap manajemen rahasia, lihat Contoh alur kerja rahasia.

Membaca data dengan JDBC

Anda harus mengonfigurasi sejumlah pengaturan untuk membaca data menggunakan JDBC. Perhatikan bahwa setiap database menggunakan format yang berbeda untuk <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark secara otomatis membaca skema dari tabel database dan memetakan jenisnya kembali ke jenis Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Anda dapat menjalankan kueri terhadap tabel JDBC ini:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Menulis data dengan JDBC

Menyimpan data ke tabel dengan JDBC menggunakan konfigurasi serupa untuk membaca. Lihat contoh berikut:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Perilaku default mencoba membuat tabel baru dan melemparkan kesalahan jika tabel dengan nama tersebut sudah ada.

Anda dapat menambahkan data ke tabel yang sudah ada menggunakan sintaks berikut:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Anda bisa menimpa tabel yang sudah ada menggunakan sintaks berikut:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Mengontrol paralelisme untuk kueri JDBC

Secara default, driver JDBC meminta database sumber hanya dengan satu utas. Untuk meningkatkan performa bacaan, Anda perlu menentukan sejumlah opsi untuk mengontrol berapa banyak kueri simultan yang dibuat Azure Databricks ke database Anda. Untuk kluster kecil, mengatur numPartitions opsi yang sama dengan jumlah inti pelaksana di kluster Anda memastikan bahwa semua data kueri simpul secara paralel.

Peringatan

Mengatur numPartitions ke nilai tinggi pada kluster besar dapat mengakibatkan performa negatif untuk database jarak jauh, karena terlalu banyak kueri simultan yang mungkin membanjiri layanan. Ini sangat merepotkan untuk database aplikasi. Waspadalah terhadap pengaturan nilai ini di atas 50.

Catatan

Percepat kueri dengan memilih kolom dengan indeks yang dihitung dalam database sumber untuk partitionColumn.

Contoh kode berikut menunjukkan konfigurasi paralelisme untuk kluster dengan delapan inti:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Catatan

Azure Databricks mendukung semua opsi Apache Spark untuk mengonfigurasi JDBC.

Saat menulis ke database menggunakan JDBC, Apache Spark menggunakan jumlah partisi dalam memori untuk mengontrol paralelisme. Anda dapat mempartisi ulang data sebelum menulis untuk mengontrol paralelisme. Hindari jumlah partisi yang tinggi pada kluster besar untuk menghindari kewalahan database jarak jauh Anda. Contoh berikut menunjukkan partisi ulang ke delapan partisi sebelum menulis:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Menekan kueri ke mesin database

Anda dapat menekan seluruh kueri ke database dan mengembalikan hasilnya. table Parameter mengidentifikasi tabel JDBC untuk dibaca. Anda dapat menggunakan apa pun yang valid dalam klausa FROM kueri SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Mengontrol jumlah baris yang diambil per kueri

Driver JDBC memiliki fetchSize parameter yang mengontrol jumlah baris yang diambil pada satu waktu dari database jarak jauh.

Pengaturan Hasil
Terlalu rendah Latensi tinggi karena banyak perjalanan pulang pergi (beberapa baris yang dikembalikan per kueri)
Terlalu tinggi Kesalahan kehabisan memori (terlalu banyak data yang dikembalikan dalam satu kueri)

Nilai optimal tergantung pada beban kerja. Pertimbangannya meliputi:

  • Berapa banyak kolom yang dikembalikan oleh kueri?
  • Jenis data apa yang dikembalikan?
  • Berapa lama string di setiap kolom dikembalikan?

Sistem mungkin memiliki default yang sangat kecil dan mendapat manfaat dari penyetelan. Misalnya: Default fetchSize Oracle adalah 10. Meningkatkannya menjadi 100 mengurangi jumlah total kueri yang perlu dijalankan dengan faktor 10. Hasil JDBC adalah lalu lintas jaringan, jadi hindari jumlah yang sangat besar, tetapi nilai optimal mungkin dalam ribuan untuk banyak himpunan data.

fetchSize Gunakan opsi , seperti dalam contoh berikut:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()