Important
Lakebase Provisioned は、 westus、 westus2、 eastus、 eastus2、 centralus、 southcentralus、 northeurope、 westeurope、 australiaeast、 brazilsouth、 canadacentral、 centralindia、 southeastasia、 uksouthの各リージョンで利用できます。
Lakebase Provisioned は、手動でスケーリングするプロビジョニング済みコンピューティングを使用する元の Lakebase オファリングです。 自動スケール コンピューティング、ゼロへのスケール、分岐、インスタント リストアを使用した最新バージョンの Lakebase については、 Lakebase の自動スケールに関するドキュメントを参照してください。
新しい Lakebase インスタンスは、自動スケール プロジェクトとして作成されます。 ロールアウトは 2026 年 3 月 12 日に開始されます。 詳細については、 既定での自動スケールに関するページを参照してください。
このページには、 Azure Databricks ノートブック を使用して Lakebase データベース インスタンスにアクセスし、Python と Scala を使用してクエリを実行する方法を示すコード例が含まれています。
例では、さまざまなユース ケースに合わせてさまざまな接続方法について説明します。
- 単一接続: 単一のデータベース接続が開き、使用され、閉じられる単純なスクリプトに使用されます。
- 接続プール: 再利用可能な接続のプールが維持される、コンカレンシーの高いワークロードに使用されます。
- M2M OAuth トークンのローテーション: 有効期間が短く、自動的に更新された OAuth トークンを認証に使用します。
次の例では、セキュリティで保護された資格情報をプログラムで生成します。 ノートブックに資格情報を直接配置することは避けてください。 Databricks では、次のいずれかの安全な方法を使用することをお勧めします。
- Postgres パスワードを Azure Databricks シークレットに格納します。
- M2M OAuth を使用して OAuth トークンを生成します。
開始する前に
データベース インスタンスにアクセスする前に、次の要件を満たしていることを確認してください。
- データベース インスタンスにログインするための対応する Postgres ロールがあります。 PostgreSQL ロールの管理を参照してください。
- Postgres ロールには、データベース、スキーマ、またはテーブルにアクセスするために必要なアクセス許可が付与されます。
- データベース インスタンスに対して認証できます。 データベース インスタンスの OAuth トークンを手動で取得する必要がある場合は、「 データベース インスタンスに対する認証」を参照してください。
Warnung
プライベート リンクを使用している場合は、1 人のユーザー クラスターを使用する必要があります。
Python
Azure Databricks Python SDK を使用して、それぞれのデータベース インスタンスの OAuth トークンを取得できます。
次の Python ライブラリを使用して、Azure Databricks ノートブックからデータベース インスタンスに接続します。
psycopg2psycopg3SQLAlchemy
[前提条件]
次のコード例を実行する前に、Databricks SDK for Python をバージョン 0.61.0 以降にアップグレードしてから、Python を再起動します。
%pip install databricks-sdk>=0.61.0
%restart_python
psycopg2
コード例では、1 つの接続と接続プールの使用を示します。 データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、 Python SDK を使用して OAuth トークンを取得する方法を参照してください。
単一接続
import psycopg2
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
接続プール
import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 1, # Minimum number of connections in the pool
maxconn = 10, # Maximum number of connections in the pool
user = "<YOUR USER>",
password = cred.token,
host = instance.read_write_dns,
port = '5432',
database = 'databricks_postgres'
)
if connection_pool:
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
connection = None
try:
# Get a connection from the pool
connection = connection_pool.getconn()
if connection:
print("Successfully received a connection from the pool")
execFn(connection)
finally:
# Release the connection back to the pool
if connection:
connection_pool.putconn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
このコード例では、M2M OAuth をローテーションする接続プールの使用を示します。
generate_database_credential()を使用します。 データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、 Python SDK を使用して OAuth トークンを取得する方法を参照してください。
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
コード例では、1 つの接続と、M2M OAuth トークンをローテーションする接続プールの使用を示します。 データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、 Python SDK を使用して OAuth トークンを取得する方法を参照してください。
単一接続
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
接続プール & M2M OAuthのローテーション
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
コード例は、プログラムによってデータベース インスタンスと資格情報を取得する方法と、単一の接続または接続プールを使用してデータベース インスタンスに接続する方法を示しています。
手順 1: Azure Databricks Java SDK を使用して OAuth トークンを取得する
データベース インスタンスと資格情報をプログラムで取得する方法の詳細については、 Java SDK を使用して OAuth トークンを取得する方法を参照してください。
手順 2: データベース インスタンスに接続する
単一接続
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
接続プール
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()