Menggunakan buku catatan untuk mengakses instans database

Penting

Lakebase Provisioned adalah penawaran Lakebase asli yang menggunakan komputasi yang disediakan yang Anda skalakan secara manual. Untuk wilayah yang didukung, lihat Ketersediaan wilayah. Untuk versi terbaru Lakebase, dengan komputasi penskalaan otomatis, skala-ke-nol, pencabangan, dan pemulihan instan, lihat Penskalaan Otomatis Lakebase.

Instans Lakebase baru akan dibuat sebagai proyek Autoscaling. Peluncuran dimulai 12 Maret 2026. Untuk informasi lebih lanjut, lihat Penskalaan otomatis yang diaktifkan secara default.

Halaman ini berisi contoh kode yang menunjukkan kepada Anda cara mengakses instans database Lakebase Anda melalui notebook Azure Databricks dan menjalankan kueri menggunakan Python dan Scala.

Contohnya mencakup strategi koneksi yang berbeda agar sesuai dengan kasus penggunaan yang berbeda:

  • Koneksi tunggal: Digunakan untuk skrip sederhana di mana koneksi database tunggal dibuka, digunakan, dan ditutup.
  • Kumpulan koneksi: Digunakan untuk beban kerja dengan konkurensi tinggi, di mana kumpulan koneksi yang dapat digunakan kembali disediakan.
  • Memutar token M2M OAuth: Menggunakan token OAuth berumur pendek yang di-refresh secara otomatis untuk autentikasi.

Contoh berikut secara terprogram menghasilkan kredensial yang aman. Hindari langsung memasukkan kredensial ke dalam buku catatan. Databricks merekomendasikan penggunaan salah satu metode aman berikut:

  • Simpan kata sandi Postgres di rahasia Azure Databricks.
  • Hasilkan token OAuth menggunakan M2M OAuth.

Sebelum Anda mulai

Pastikan Anda memenuhi persyaratan berikut sebelum mengakses instans database Anda:

  • Anda memiliki peran Postgres yang sesuai untuk masuk ke instans database. Lihat Mengelola peran PostgreSQL.
  • Anda memiliki peran Postgres yang telah diberikan izin yang diperlukan untuk mengakses basis data, skema, atau tabel.
  • Anda dapat mengautentikasi ke instans database. Jika Anda harus mendapatkan token OAuth secara manual untuk instans database Anda, lihat Mengautentikasi ke instans database.

Peringatan

Jika Anda menggunakan tautan privat, Anda perlu menggunakan satu kluster pengguna.

Phyton

Azure Databricks Python SDK dapat digunakan untuk mendapatkan token OAuth untuk instans database masing-masing.

Sambungkan ke instans database Anda dari buku catatan Azure Databricks menggunakan pustaka Python berikut:

  • psycopg (psycopg3)
  • psycopg dengan pengumpulan koneksi
  • SQLAlchemy

Prasyarat

Sebelum menjalankan contoh kode berikut, tingkatkan Databricks SDK untuk Python ke versi 0.61.0 atau yang lebih baru, lalu mulai ulang Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg

Contoh kode menunjukkan satu koneksi dan penggunaan kumpulan koneksi. Untuk informasi selengkapnya tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Python SDK.

%pip install "psycopg[binary,pool]"

Koneksi tunggal

import psycopg

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Kumpulan koneksi

import psycopg
from psycopg_pool import ConnectionPool

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = ConnectionPool(
    conninfo=f"dbname=databricks_postgres user=<YOUR USER> host={instance.read_write_dns} port=5432 password={cred.token} sslmode=require",
    min_size=1,  # Minimum number of connections in the pool
    max_size=10,  # Maximum number of connections in the pool
    open=True
)
print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    with connection_pool.connection() as connection:
        print("Successfully received a connection from the pool")
        execFn(connection)
        print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

Contoh kode menunjukkan penggunaan kumpulan koneksi dengan M2M OAuth yang berputar. Ini menggunakan generate_database_credential(). Untuk informasi selengkapnya tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Python SDK.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Contoh kode menunjukkan satu koneksi dan penggunaan kumpulan koneksi dengan token M2M OAuth yang berputar. Untuk informasi selengkapnya tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Python SDK.

Koneksi tunggal

%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Kumpulan koneksi & memutar M2M OAuth

%pip install "sqlalchemy>=2.0" "psycopg[binary]"
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Contoh kode menunjukkan cara mendapatkan instans dan kredensial database secara terprogram, dan cara menyambungkan ke instans database menggunakan satu koneksi atau kumpulan koneksi.

Langkah 1: Gunakan Azure Databricks Java SDK untuk mendapatkan token OAuth

Untuk detail tentang cara mendapatkan instans database dan kredensial secara terprogram, lihat cara mendapatkan token OAuth menggunakan Java SDK.

Langkah 2: Menyambungkan ke instans database

Koneksi tunggal

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Kumpulan koneksi

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()