Partekatu bidez


Uso de un cuaderno para acceder a una instancia de base de datos

Importante

Esta característica se encuentra en versión preliminar pública en las siguientes regiones: westus, westus2, eastuseastus2, centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindia, , . southeastasiauksouth

Esta página contiene ejemplos de código que muestran cómo acceder a la instancia de base de datos de Lakebase a través de cuadernos de Azure Databricks y ejecutar consultas mediante Python y Scala.

En los ejemplos se tratan diferentes estrategias de conexión para adaptarse a distintos casos de uso:

  • Conexión única: se usa para scripts simples en los que se abre, usa y cierra una conexión de base de datos única.
  • Grupo de conexiones: se usa para cargas de trabajo de alta simultaneidad, donde se mantiene un grupo de conexiones reutilizables.
  • Rotación del token de OAuth de M2M: emplea tokens de OAuth que se actualizan automáticamente para la autenticación.

En los ejemplos siguientes se generan credenciales seguras mediante programación. Evite colocar directamente las credenciales en un cuaderno. Databricks recomienda usar uno de los métodos seguros siguientes:

Antes de empezar

Asegúrese de cumplir los siguientes requisitos antes de acceder a la instancia de base de datos:

  • Tiene el correspondiente rol de Postgres para acceder a la instancia de la base de datos. Consulte Administración de roles de Postgres.
  • A tu rol de Postgres se le conceden los permisos necesarios para acceder a la base de datos, el esquema o la tabla.
  • Puede autenticarse en la instancia de base de datos. Si debe obtener manualmente un token de OAuth para la instancia de base de datos, consulte Autenticación en una instancia de base de datos.

Advertencia

Si usa private link, debe usar un único clúster de usuarios.

Pitón

El SDK de Python de Azure Databricks se puede usar para obtener un token de OAuth para una instancia de base de datos respectiva.

Conéctese a la instancia de base de datos desde un cuaderno de Azure Databricks mediante las siguientes bibliotecas de Python:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Prerrequisitos

Antes de ejecutar los siguientes ejemplos de código, actualice el SDK de Databricks para Python a la versión 0.61.0 o superior y reinicie Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Los ejemplos de código muestran una sola conexión y el uso de un grupo de conexiones. Para obtener más información sobre cómo obtener la instancia de base de datos y las credenciales mediante programación, consulte cómo obtener un token de OAuth mediante el SDK de Python.

Conexión única

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Grupo de conexiones

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

En el ejemplo de código se muestra el uso de un grupo de conexiones con una OAuth de M2M giratoria. Usa generate_database_credential(). Para obtener más información sobre cómo obtener la instancia de base de datos y las credenciales mediante programación, consulte cómo obtener un token de OAuth mediante el SDK de Python.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Los ejemplos de código muestran una sola conexión y el uso de un grupo de conexiones con un token de OAuth de M2M giratorio. Para obtener más información sobre cómo obtener la instancia de base de datos y las credenciales mediante programación, consulte cómo obtener un token de OAuth mediante el SDK de Python.

Conexión única

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Pool de conexiones y rotación de OAuth M2M

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Los ejemplos de código muestran cómo obtener mediante programación la instancia de base de datos y las credenciales, y cómo conectarse a una instancia de base de datos mediante una sola conexión o un grupo de conexiones.

Paso 1: Uso del SDK de Java de Azure Databricks para obtener un token de OAuth

Para más información sobre cómo obtener la instancia de base de datos y las credenciales mediante programación, consulte cómo obtener un token de OAuth mediante el SDK de Java.

Paso 2: Conexión a una instancia de base de datos

Conexión única

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Grupo de conexiones

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()