Mengkueri database menggunakan JDBC
Azure Databricks mendukung menyambungkan ke database eksternal menggunakan JDBC. Artikel ini menyediakan sintaks dasar untuk mengonfigurasi dan menggunakan koneksi ini dengan contoh di Python, SQL, dan Scala.
Catatan
Anda mungkin lebih suka Federasi Lakehouse untuk mengelola kueri ke sistem database eksternal. Lihat Apa itu Federasi Lakehouse.
Mitra Koneksi menyediakan integrasi yang dioptimalkan untuk menyinkronkan data dengan banyak sumber data eksternal. Lihat Apa itu Databricks Partner Connect?.
Penting
Contoh dalam artikel ini tidak menyertakan nama pengguna dan kata sandi di URL JDBC. Databricks merekomendasikan penggunaan rahasia untuk menyimpan kredensial database Anda. Misalnya:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Untuk mereferensikan rahasia Databricks dengan SQL, Anda harus mengonfigurasi properti konfigurasi Spark selama initilisasi kluster.
Untuk contoh lengkap manajemen rahasia, lihat Contoh alur kerja rahasia.
Membaca data dengan JDBC
Anda harus mengonfigurasi sejumlah pengaturan untuk membaca data menggunakan JDBC. Perhatikan bahwa setiap database menggunakan format yang berbeda untuk <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark secara otomatis membaca skema dari tabel database dan memetakan jenisnya kembali ke jenis Spark SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Anda dapat menjalankan kueri terhadap tabel JDBC ini:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Menulis data dengan JDBC
Menyimpan data ke tabel dengan JDBC menggunakan konfigurasi serupa untuk membaca. Lihat contoh berikut:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Perilaku default mencoba membuat tabel baru dan melemparkan kesalahan jika tabel dengan nama tersebut sudah ada.
Anda dapat menambahkan data ke tabel yang sudah ada menggunakan sintaks berikut:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Anda bisa menimpa tabel yang sudah ada menggunakan sintaks berikut:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Mengontrol paralelisme untuk kueri JDBC
Secara default, driver JDBC meminta database sumber hanya dengan satu utas. Untuk meningkatkan performa bacaan, Anda perlu menentukan sejumlah opsi untuk mengontrol berapa banyak kueri simultan yang dibuat Azure Databricks ke database Anda. Untuk kluster kecil, mengatur numPartitions
opsi yang sama dengan jumlah inti pelaksana di kluster Anda memastikan bahwa semua data kueri simpul secara paralel.
Peringatan
Mengatur numPartitions
ke nilai tinggi pada kluster besar dapat mengakibatkan performa negatif untuk database jarak jauh, karena terlalu banyak kueri simultan yang mungkin membanjiri layanan. Ini sangat merepotkan untuk database aplikasi. Waspadalah terhadap pengaturan nilai ini di atas 50.
Catatan
Percepat kueri dengan memilih kolom dengan indeks yang dihitung dalam database sumber untuk partitionColumn
.
Contoh kode berikut menunjukkan konfigurasi paralelisme untuk kluster dengan delapan inti:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Catatan
Azure Databricks mendukung semua opsi Apache Spark untuk mengonfigurasi JDBC.
Saat menulis ke database menggunakan JDBC, Apache Spark menggunakan jumlah partisi dalam memori untuk mengontrol paralelisme. Anda dapat mempartisi ulang data sebelum menulis untuk mengontrol paralelisme. Hindari jumlah partisi yang tinggi pada kluster besar untuk menghindari kewalahan database jarak jauh Anda. Contoh berikut menunjukkan partisi ulang ke delapan partisi sebelum menulis:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Menekan kueri ke mesin database
Anda dapat menekan seluruh kueri ke database dan mengembalikan hasilnya. table
Parameter mengidentifikasi tabel JDBC untuk dibaca. Anda dapat menggunakan apa pun yang valid dalam klausa FROM
kueri SQL.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Mengontrol jumlah baris yang diambil per kueri
Driver JDBC memiliki fetchSize
parameter yang mengontrol jumlah baris yang diambil pada satu waktu dari database jarak jauh.
Pengaturan | Hasil |
---|---|
Terlalu rendah | Latensi tinggi karena banyak perjalanan pulang pergi (beberapa baris yang dikembalikan per kueri) |
Terlalu tinggi | Kesalahan kehabisan memori (terlalu banyak data yang dikembalikan dalam satu kueri) |
Nilai optimal tergantung pada beban kerja. Pertimbangannya meliputi:
- Berapa banyak kolom yang dikembalikan oleh kueri?
- Jenis data apa yang dikembalikan?
- Berapa lama string di setiap kolom dikembalikan?
Sistem mungkin memiliki default yang sangat kecil dan mendapat manfaat dari penyetelan. Misalnya: Default fetchSize
Oracle adalah 10. Meningkatkannya menjadi 100 mengurangi jumlah total kueri yang perlu dijalankan dengan faktor 10. Hasil JDBC adalah lalu lintas jaringan, jadi hindari jumlah yang sangat besar, tetapi nilai optimal mungkin dalam ribuan untuk banyak himpunan data.
fetchSize
Gunakan opsi , seperti dalam contoh berikut:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()