Share via


Use a notebook to access a database instance

Important

This feature is in Public Preview in the following regions: westus, westus2, eastus, eastus2, northeurope, westeurope, australiaeast, brazilsouth, canadacentral, centralindia, centralus, southcentralus, southeastasia, uksouth.

This page contains code examples that show you how to access your Lakebase database instance through Azure Databricks notebooks and run queries using Python and Scala.

The examples cover different connection strategies to suit different use cases:

  • Single connection: Used for simple scripts where a single database connection is opened, used, and closed.
  • Connection pool: Used for high-concurrency workloads, where a pool of reusable connections is maintained.
  • Rotating M2M OAuth token: Uses short-lived, automatically refreshed OAuth tokens for authentication.

The following examples programmatically generate secure credentials. Avoid directly putting credentials in a notebook. Databricks recommends using one of the following secure methods:

  • Store Postgres passwords in Azure Databricks secrets.
  • Generate OAuth tokens using M2M OAuth.

Before you begin

Ensure you meet the following requirements before accessing your database instance:

Python

The Azure Databricks Python SDK can be used to obtain an OAuth token for a respective database instance.

Connect to your database instance from a Azure Databricks notebook using the following Python libraries:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Use the Azure Databricks Python SDK to obtain an OAuth token

Azure Databricks SDK bindings are available in Python SDK version v0.56.0. If you are running with an older version of the SDK, run these commands first.

%pip install --upgrade databricks-sdk
%restart_python

The Azure Databricks SDK generates a secure OAuth token, cred for your database instance, instance. Enter your database instance name where needed.

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

psycopg2

The code examples demonstrate a single connection and the use of a connection pool. For more on how to obtain the database instance and credentials programmatically, see Use the Azure Databricks Python SDK to obtain an OAuth token.

Single connection

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Connection pool

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

The code example demonstrates the use of a connection pool with a rotating M2M OAuth. It uses generate_database_credential(). For more on how to obtain the database instance and credentials programmatically, see Use the Azure Databricks Python SDK to obtain an OAuth token.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        instance = w.database.get_database_instance(name=instance_name)
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[host])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

The code examples demonstrate a single connection and the use of a connection pool with a rotating M2M OAuth token. For more on how to obtain the database instance and credentials programmatically, see Use the Azure Databricks Python SDK to obtain an OAuth token.

Single connection

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Connection pool & rotating M2M OAuth

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[host])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

The code examples show how to programmatically obtain the database instance and credentials, and how to connect to a database instance using a single connection or a connection pool.

Step 1: Use the Azure Databricks Java SDK to obtain an OAuth token

Database SDK bindings are available in Java SDK version v0.53.0 or newer. If you are running with an older version of the SDK, you might need to refresh the imported SDK. For more information, see here.

%scala

import com.databricks.sdk.WorkspaceClient
import com.databricks.sdk.service.database.GetDatabaseInstanceRequest
import com.databricks.sdk.service.database.GenerateDatabaseCredentialRequest
import com.databricks.sdk.service.database.DatabaseInstance
import com.databricks.sdk.service.database.DatabaseCredential
import java.util.Collections
import java.util.UUID

val w = new WorkspaceClient()

val instanceName = "<YOUR INSTANCE>"
val databaseName =  "databricks_postgres"
val userName = "<YOUR USER>"

// Get database instance
val instance = w.database().getDatabaseInstance(
    new GetDatabaseInstanceRequest().setName(instanceName)
);

// Generate database credential
val cred = w.database().generateDatabaseCredential(
    new GenerateDatabaseCredentialRequest()
        .setRequestId(UUID.randomUUID().toString())
        .setInstanceNames(Collections.singletonList(instanceName))
);

// Print out credential details
System.out.println("Database instance: " + instance.getName());
System.out.println("Credential: " + cred.getToken());

Step 2: Connect to a database instance

Single connection

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Connection pool

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()