Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Cette fonctionnalité est en préversion publique dans les régions suivantes : westus, , westus2eastuseastus2, , centralus, southcentralus, , northeuropewesteurope, , australiaeastbrazilsouth, , canadacentral, centralindia, , . southeastasiauksouth
Cette page contient des exemples de code qui vous montrent comment accéder à votre instance de base de données Lakebase via des notebooks Azure Databricks et exécuter des requêtes à l’aide de Python et Scala.
Les exemples couvrent différentes stratégies de connexion pour répondre à différents cas d’usage :
- Connexion unique : utilisée pour les scripts simples où une seule connexion de base de données est ouverte, utilisée et fermée.
- Pool de connexions : utilisé pour les charges de travail à haute concurrence, où un pool de connexions réutilisables est géré.
- Rotation du jeton OAuth M2M : utilise des jetons OAuth à courte durée et actualisées automatiquement pour l’authentification.
Les exemples suivants génèrent par programmation des informations d’identification sécurisées. Évitez de placer directement les informations d’identification dans un bloc-notes. Databricks recommande d’utiliser l’une des méthodes sécurisées suivantes :
- Stockez les mots de passe Postgres dans les secrets d'Azure Databricks.
- Générez des jetons OAuth à l’aide de M2M OAuth.
Avant de commencer
Veillez à répondre aux exigences suivantes avant d’accéder à votre instance de base de données :
- Vous disposez d’un rôle Postgres correspondant pour vous connecter à l’instance de base de données. Consultez Gérer les rôles Postgres.
- Votre rôle Postgres dispose des autorisations nécessaires pour accéder à la base de données, au schéma ou à la table.
- Vous pouvez vous authentifier auprès de l’instance de base de données. Si vous devez obtenir manuellement un jeton OAuth pour votre instance de base de données, consultez S’authentifier auprès d’une instance de base de données.
Avertissement
Si vous utilisez une liaison privée, vous devez utiliser un cluster utilisateur unique.
Python
Le Kit de développement logiciel (SDK) Python Azure Databricks peut être utilisé pour obtenir un jeton OAuth pour une instance de base de données respective.
Connectez-vous à votre instance de base de données à partir d’un notebook Azure Databricks à l’aide des bibliothèques Python suivantes :
psycopg2psycopg3SQLAlchemy
Prerequisites
Avant d’exécuter les exemples de code suivants, mettez à niveau le Kit de développement logiciel (SDK) Databricks pour Python vers la version 0.61.0 ou ultérieure, puis redémarrez Python.
%pip install databricks-sdk>=0.61.0
%restart_python
psycopg2
Les exemples de code illustrent une connexion unique et l’utilisation d’un pool de connexions. Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Python.
Connexion unique
import psycopg2
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
Pool de connexions
import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 1, # Minimum number of connections in the pool
maxconn = 10, # Maximum number of connections in the pool
user = "<YOUR USER>",
password = cred.token,
host = instance.read_write_dns,
port = '5432',
database = 'databricks_postgres'
)
if connection_pool:
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
connection = None
try:
# Get a connection from the pool
connection = connection_pool.getconn()
if connection:
print("Successfully received a connection from the pool")
execFn(connection)
finally:
# Release the connection back to the pool
if connection:
connection_pool.putconn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
L’exemple de code illustre l’utilisation d’un pool de connexions avec un OAuth M2M pivotant. Il utilise generate_database_credential(). Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Python.
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
Les exemples de code illustrent une connexion unique et l’utilisation d’un pool de connexions avec un jeton OAuth M2M pivotant. Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Python.
Connexion unique
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Pool de connexions et rotation M2M OAuth
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
Les exemples de code montrent comment obtenir par programmation l’instance de base de données et les informations d’identification, et comment se connecter à une instance de base de données à l’aide d’une connexion unique ou d’un pool de connexions.
Étape 1 : Utiliser le Kit de développement logiciel (SDK) Java Azure Databricks pour obtenir un jeton OAuth
Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Java.
Étape 2 : Se connecter à une instance de base de données
Connexion unique
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
Pool de connexions
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()