Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Penting
Lakebase Provisioned adalah penawaran Lakebase asli yang menggunakan komputasi yang disediakan yang Anda skalakan secara manual. Untuk wilayah yang didukung, lihat Ketersediaan wilayah. Untuk versi terbaru Lakebase, dengan komputasi penskalaan otomatis, skala-ke-nol, pencabangan, dan pemulihan instan, lihat Penskalaan Otomatis Lakebase.
Instans Lakebase baru akan dibuat sebagai proyek Autoscaling. Peluncuran dimulai 12 Maret 2026. Untuk informasi lebih lanjut, lihat Penskalaan otomatis yang diaktifkan secara default.
Halaman ini berisi contoh kode yang menunjukkan kepada Anda cara mengakses instans database Lakebase Anda melalui notebook Azure Databricks dan menjalankan kueri menggunakan Python dan Scala.
Contohnya mencakup strategi koneksi yang berbeda agar sesuai dengan kasus penggunaan yang berbeda:
- Koneksi tunggal: Digunakan untuk skrip sederhana di mana koneksi database tunggal dibuka, digunakan, dan ditutup.
- Kumpulan koneksi: Digunakan untuk beban kerja dengan konkurensi tinggi, di mana kumpulan koneksi yang dapat digunakan kembali disediakan.
- Memutar token M2M OAuth: Menggunakan token OAuth berumur pendek yang di-refresh secara otomatis untuk autentikasi.
Contoh berikut secara terprogram menghasilkan kredensial yang aman. Hindari langsung memasukkan kredensial ke dalam buku catatan. Databricks merekomendasikan penggunaan salah satu metode aman berikut:
- Simpan kata sandi Postgres di rahasia Azure Databricks.
- Hasilkan token OAuth menggunakan M2M OAuth.
Sebelum Anda mulai
Pastikan Anda memenuhi persyaratan berikut sebelum mengakses instans database Anda:
- Anda memiliki peran Postgres yang sesuai untuk masuk ke instans database. Lihat Mengelola peran PostgreSQL.
- Anda memiliki peran Postgres yang telah diberikan izin yang diperlukan untuk mengakses basis data, skema, atau tabel.
- Anda dapat mengautentikasi ke instans database. Jika Anda harus mendapatkan token OAuth secara manual untuk instans database Anda, lihat Mengautentikasi ke instans database.
Peringatan
Jika Anda menggunakan tautan privat, Anda perlu menggunakan satu kluster pengguna.
Phyton
Azure Databricks Python SDK dapat digunakan untuk mendapatkan token OAuth untuk instans database masing-masing.
Sambungkan ke instans database Anda dari buku catatan Azure Databricks menggunakan pustaka Python berikut:
-
psycopg(psycopg3) -
psycopgdengan pengumpulan koneksi SQLAlchemy
Prasyarat
Sebelum menjalankan contoh kode berikut, tingkatkan Databricks SDK untuk Python ke versi 0.61.0 atau yang lebih baru, lalu mulai ulang Python.
%pip install databricks-sdk>=0.61.0
%restart_python
psycopg
Contoh kode menunjukkan satu koneksi dan penggunaan kumpulan koneksi. Untuk informasi selengkapnya tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Python SDK.
%pip install "psycopg[binary,pool]"
Koneksi tunggal
import psycopg
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
Kumpulan koneksi
import psycopg
from psycopg_pool import ConnectionPool
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = ConnectionPool(
conninfo=f"dbname=databricks_postgres user=<YOUR USER> host={instance.read_write_dns} port=5432 password={cred.token} sslmode=require",
min_size=1, # Minimum number of connections in the pool
max_size=10, # Maximum number of connections in the pool
open=True
)
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
with connection_pool.connection() as connection:
print("Successfully received a connection from the pool")
execFn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
Contoh kode menunjukkan penggunaan kumpulan koneksi dengan M2M OAuth yang berputar. Ini menggunakan generate_database_credential(). Untuk informasi selengkapnya tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Python SDK.
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
Contoh kode menunjukkan satu koneksi dan penggunaan kumpulan koneksi dengan token M2M OAuth yang berputar. Untuk informasi selengkapnya tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Python SDK.
Koneksi tunggal
%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Kumpulan koneksi & memutar M2M OAuth
%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
Contoh kode menunjukkan cara mendapatkan instans dan kredensial database secara terprogram, dan cara menyambungkan ke instans database menggunakan satu koneksi atau kumpulan koneksi.
Langkah 1: Gunakan Azure Databricks Java SDK untuk mendapatkan token OAuth
Untuk detail tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Java SDK.
Langkah 2: Menyambungkan ke instans database
Koneksi tunggal
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
Kumpulan koneksi
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()