Partager via


Utiliser un notebook pour accéder à une instance de base de données

Important

Cette fonctionnalité est en préversion publique dans les régions suivantes : westus, , westus2eastuseastus2, , centralus, southcentralus, , northeuropewesteurope, , australiaeastbrazilsouth, , canadacentral, centralindia, , . southeastasiauksouth

Cette page contient des exemples de code qui vous montrent comment accéder à votre instance de base de données Lakebase via des notebooks Azure Databricks et exécuter des requêtes à l’aide de Python et Scala.

Les exemples couvrent différentes stratégies de connexion pour répondre à différents cas d’usage :

  • Connexion unique : utilisée pour les scripts simples où une seule connexion de base de données est ouverte, utilisée et fermée.
  • Pool de connexions : utilisé pour les charges de travail à haute concurrence, où un pool de connexions réutilisables est géré.
  • Rotation du jeton OAuth M2M : utilise des jetons OAuth à courte durée et actualisées automatiquement pour l’authentification.

Les exemples suivants génèrent par programmation des informations d’identification sécurisées. Évitez de placer directement les informations d’identification dans un bloc-notes. Databricks recommande d’utiliser l’une des méthodes sécurisées suivantes :

  • Stockez les mots de passe Postgres dans les secrets d'Azure Databricks.
  • Générez des jetons OAuth à l’aide de M2M OAuth.

Avant de commencer

Veillez à répondre aux exigences suivantes avant d’accéder à votre instance de base de données :

  • Vous disposez d’un rôle Postgres correspondant pour vous connecter à l’instance de base de données. Consultez Gérer les rôles Postgres.
  • Votre rôle Postgres dispose des autorisations nécessaires pour accéder à la base de données, au schéma ou à la table.
  • Vous pouvez vous authentifier auprès de l’instance de base de données. Si vous devez obtenir manuellement un jeton OAuth pour votre instance de base de données, consultez S’authentifier auprès d’une instance de base de données.

Avertissement

Si vous utilisez une liaison privée, vous devez utiliser un cluster utilisateur unique.

Python

Le Kit de développement logiciel (SDK) Python Azure Databricks peut être utilisé pour obtenir un jeton OAuth pour une instance de base de données respective.

Connectez-vous à votre instance de base de données à partir d’un notebook Azure Databricks à l’aide des bibliothèques Python suivantes :

  • psycopg2
  • psycopg3
  • SQLAlchemy

Prerequisites

Avant d’exécuter les exemples de code suivants, mettez à niveau le Kit de développement logiciel (SDK) Databricks pour Python vers la version 0.61.0 ou ultérieure, puis redémarrez Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Les exemples de code illustrent une connexion unique et l’utilisation d’un pool de connexions. Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Python.

Connexion unique

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Pool de connexions

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

L’exemple de code illustre l’utilisation d’un pool de connexions avec un OAuth M2M pivotant. Il utilise generate_database_credential(). Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Python.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Les exemples de code illustrent une connexion unique et l’utilisation d’un pool de connexions avec un jeton OAuth M2M pivotant. Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Python.

Connexion unique

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Pool de connexions et rotation M2M OAuth

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Les exemples de code montrent comment obtenir par programmation l’instance de base de données et les informations d’identification, et comment se connecter à une instance de base de données à l’aide d’une connexion unique ou d’un pool de connexions.

Étape 1 : Utiliser le Kit de développement logiciel (SDK) Java Azure Databricks pour obtenir un jeton OAuth

Pour plus d’informations sur l’obtention par programmation de l’instance de base de données et des informations d’identification, consultez comment obtenir un jeton OAuth à l’aide du Kit de développement logiciel (SDK) Java.

Étape 2 : Se connecter à une instance de base de données

Connexion unique

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Pool de connexions

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()