Поделиться через


Использование записной книжки для доступа к экземпляру базы данных

Это важно

Эта функция доступна в общедоступной предварительной версии в следующих регионах: westus, westus2eastuseastus2centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindia, . southeastasiauksouth

На этой странице содержатся примеры кода, показывающие, как получить доступ к экземпляру базы данных Lakebase с помощью записных книжек Azure Databricks и выполнять запросы с помощью Python и Scala.

В примерах рассматриваются различные стратегии подключения для различных вариантов использования:

  • Одно подключение: используется для простых сценариев, в которых открыто, используется и закрывается одно подключение к базе данных.
  • Пул подключений: используется для рабочих нагрузок с высокой одновременностью, при которых поддерживается пул повторно используемых подключений.
  • Циклическое обновление M2M маркера OAuth: использует кратковременные, автоматически обновляемые маркеры OAuth для аутентификации.

В следующих примерах программно создаются безопасные учетные данные. Избегайте прямого размещения учетных данных в записной книжке. Databricks рекомендует использовать один из следующих безопасных методов:

  • Храните пароли Postgres в хранилищах секретов Azure Databricks.
  • Создайте маркеры OAuth с помощью M2M OAuth.

Перед тем как начать

Перед доступом к экземпляру базы данных выполните следующие требования:

  • У вас есть соответствующие права пользователя Postgres для входа в экземпляр базы данных. См. статью "Управление ролями Postgres".
  • Роль Postgres предоставляет необходимые разрешения для доступа к базе данных, схеме или таблице.
  • Вы можете аутентифицироваться в экземпляре базы данных. Если необходимо вручную получить маркер OAuth для экземпляра базы данных, ознакомьтесь с проверкой подлинности в экземпляре базы данных.

Предупреждение

Если вы используете приватный канал, необходимо использовать один пользовательский кластер.

Питон

Пакет Python SDK для Azure Databricks можно использовать для получения токена OAuth для соответствующего экземпляра базы данных.

Подключитесь к экземпляру базы данных с помощью записной книжки Azure Databricks, используя следующие библиотеки Python:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Предпосылки

Перед выполнением следующих примеров кода обновите пакет SDK Databricks для Python до версии 0.61.0 или более поздней, а затем перезапустите Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

В примерах кода демонстрируется одно подключение и использование пула соединений. Дополнительные сведения о том, как получить экземпляр базы данных и учетные данные программным способом, см. в статье о том, как получить маркер OAuth с помощью пакета SDK для Python.

Одно подключение

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Пул подключений

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

Пример кода демонстрирует использование пула соединений с ротацией M2M OAuth токенов. Он использует generate_database_credential(). Дополнительные сведения о том, как получить экземпляр базы данных и учетные данные программным способом, см. в статье о том, как получить маркер OAuth с помощью пакета SDK для Python.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

В примерах кода демонстрируется одно подключение и использование пула соединений с вращающимся маркером OAuth M2M. Дополнительные сведения о том, как получить экземпляр базы данных и учетные данные программным способом, см. в статье о том, как получить маркер OAuth с помощью пакета SDK для Python.

Одно подключение

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Пул подключений и ротация M2M OAuth

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

В примерах кода показано, как программно получить экземпляр базы данных и учетные данные, а также как подключиться к экземпляру базы данных с помощью одного подключения или пула подключений.

Шаг 1. Использование пакета SDK Java для Azure Databricks для получения маркера OAuth

Дополнительные сведения о том, как получить экземпляр базы данных и учетные данные программным способом, см. в статье о том, как получить маркер OAuth с помощью пакета SDK для Java.

Шаг 2. Подключение к экземпляру базы данных

Одно подключение

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Пул подключений

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()