Condividi tramite


Usare un notebook per accedere a un'istanza del database

Importante

Questa funzionalità è disponibile in anteprima pubblica nelle aree seguenti: westus, westus2, eastus, eastus2, centralus, southcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindiasoutheastasia. uksouth

Questa pagina contiene esempi di codice che illustrano come accedere all'istanza del database Lakebase tramite i notebook di Azure Databricks ed eseguire query usando Python e Scala.

Gli esempi riguardano diverse strategie di connessione per soddisfare casi d'uso diversi:

  • Connessione singola: usata per semplici script in cui viene aperta, usata e chiusa una singola connessione di database.
  • Pool di connessioni: usato per carichi di lavoro a concorrenza elevata, in cui viene mantenuto un pool di connessioni riutilizzabili.
  • Rotazione del token OAuth M2M: usa token OAuth di breve durata aggiornati automaticamente per l'autenticazione.

Gli esempi seguenti generano credenziali sicure a livello di codice. Evitare di inserire direttamente le credenziali in un notebook. Databricks consiglia di usare uno dei metodi sicuri seguenti:

  • Archiviare le password Postgres nei segreti di Azure Databricks.
  • Generare token OAuth usando OAuth M2M.

Prima di iniziare

Assicurarsi di soddisfare i requisiti seguenti prima di accedere all'istanza del database:

  • Si dispone di un ruolo Postgres corrispondente per accedere all'istanza del database. Vedere Gestire i ruoli di Postgres.
  • Al ruolo Postgres vengono concesse le autorizzazioni necessarie per accedere al database, allo schema o alla tabella.
  • È possibile eseguire l'autenticazione all'istanza del database. Se è necessario ottenere manualmente un token OAuth per l'istanza del database, vedere Eseguire l'autenticazione a un'istanza del database.

Avvertimento

Se si usa un collegamento privato, è necessario usare un singolo cluster utente.

Pitone

Azure Databricks Python SDK può essere usato per ottenere un token OAuth per una rispettiva istanza del database.

Connettersi all'istanza del database da un notebook di Azure Databricks usando le librerie Python seguenti:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Prerequisiti

Prima di eseguire gli esempi di codice seguenti, aggiornare Databricks SDK per Python alla versione 0.61.0 o successiva, quindi riavviare Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Gli esempi di codice illustrano una singola connessione e l'uso di un pool di connessioni. Per altre informazioni su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Python SDK.

Connessione singola

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Pool di connessioni

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

L'esempio di codice illustra l'uso di un pool di connessioni con un OAuth M2M rotante. Usa generate_database_credential(). Per altre informazioni su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Python SDK.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Gli esempi di codice illustrano una singola connessione e l'uso di un pool di connessioni con un token OAuth M2M rotante. Per altre informazioni su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Python SDK.

Connessione singola

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Pool di connessioni e rotazione delle credenziali OAuth M2M

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Negli esempi di codice viene illustrato come ottenere a livello di codice l'istanza e le credenziali del database e come connettersi a un'istanza del database usando una singola connessione o un pool di connessioni.

Passaggio 1: Usare Azure Databricks Java SDK per ottenere un token OAuth

Per informazioni dettagliate su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Java SDK.

Passaggio 2: Connettersi a un'istanza del database

Connessione singola

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Pool di connessioni

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()