Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Questa funzionalità è disponibile in anteprima pubblica nelle aree seguenti: westus, westus2, eastus, eastus2, centralus, southcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindiasoutheastasia. uksouth
Questa pagina contiene esempi di codice che illustrano come accedere all'istanza del database Lakebase tramite i notebook di Azure Databricks ed eseguire query usando Python e Scala.
Gli esempi riguardano diverse strategie di connessione per soddisfare casi d'uso diversi:
- Connessione singola: usata per semplici script in cui viene aperta, usata e chiusa una singola connessione di database.
- Pool di connessioni: usato per carichi di lavoro a concorrenza elevata, in cui viene mantenuto un pool di connessioni riutilizzabili.
- Rotazione del token OAuth M2M: usa token OAuth di breve durata aggiornati automaticamente per l'autenticazione.
Gli esempi seguenti generano credenziali sicure a livello di codice. Evitare di inserire direttamente le credenziali in un notebook. Databricks consiglia di usare uno dei metodi sicuri seguenti:
- Archiviare le password Postgres nei segreti di Azure Databricks.
- Generare token OAuth usando OAuth M2M.
Prima di iniziare
Assicurarsi di soddisfare i requisiti seguenti prima di accedere all'istanza del database:
- Si dispone di un ruolo Postgres corrispondente per accedere all'istanza del database. Vedere Gestire i ruoli di Postgres.
- Al ruolo Postgres vengono concesse le autorizzazioni necessarie per accedere al database, allo schema o alla tabella.
- È possibile eseguire l'autenticazione all'istanza del database. Se è necessario ottenere manualmente un token OAuth per l'istanza del database, vedere Eseguire l'autenticazione a un'istanza del database.
Avvertimento
Se si usa un collegamento privato, è necessario usare un singolo cluster utente.
Pitone
Azure Databricks Python SDK può essere usato per ottenere un token OAuth per una rispettiva istanza del database.
Connettersi all'istanza del database da un notebook di Azure Databricks usando le librerie Python seguenti:
psycopg2psycopg3SQLAlchemy
Prerequisiti
Prima di eseguire gli esempi di codice seguenti, aggiornare Databricks SDK per Python alla versione 0.61.0 o successiva, quindi riavviare Python.
%pip install databricks-sdk>=0.61.0
%restart_python
psycopg2
Gli esempi di codice illustrano una singola connessione e l'uso di un pool di connessioni. Per altre informazioni su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Python SDK.
Connessione singola
import psycopg2
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
Pool di connessioni
import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 1, # Minimum number of connections in the pool
maxconn = 10, # Maximum number of connections in the pool
user = "<YOUR USER>",
password = cred.token,
host = instance.read_write_dns,
port = '5432',
database = 'databricks_postgres'
)
if connection_pool:
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
connection = None
try:
# Get a connection from the pool
connection = connection_pool.getconn()
if connection:
print("Successfully received a connection from the pool")
execFn(connection)
finally:
# Release the connection back to the pool
if connection:
connection_pool.putconn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
L'esempio di codice illustra l'uso di un pool di connessioni con un OAuth M2M rotante. Usa generate_database_credential(). Per altre informazioni su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Python SDK.
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
Gli esempi di codice illustrano una singola connessione e l'uso di un pool di connessioni con un token OAuth M2M rotante. Per altre informazioni su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Python SDK.
Connessione singola
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Pool di connessioni e rotazione delle credenziali OAuth M2M
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
Negli esempi di codice viene illustrato come ottenere a livello di codice l'istanza e le credenziali del database e come connettersi a un'istanza del database usando una singola connessione o un pool di connessioni.
Passaggio 1: Usare Azure Databricks Java SDK per ottenere un token OAuth
Per informazioni dettagliate su come ottenere l'istanza del database e le credenziali a livello di codice, vedere come ottenere un token OAuth usando Java SDK.
Passaggio 2: Connettersi a un'istanza del database
Connessione singola
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
Pool di connessioni
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()