Sdílet prostřednictvím


Použití poznámkového bloku pro přístup k instanci databáze

Důležité

Tato funkce je ve verzi Public Preview v následujících oblastech: westus, westus2, eastus, eastus2, centralus, southcentralus, northeurope, westeurope, australiaeast, brazilsouth, canadacentral, centralindia, southeastasia, uksouth.

Tato stránka obsahuje příklady kódu, které ukazují, jak přistupovat k instanci databáze Lakebase prostřednictvím poznámkových bloků Azure Databricks a spouštět dotazy pomocí Pythonu a Scaly.

Příklady zahrnují různé strategie připojení tak, aby vyhovovaly různým případům použití:

  • Jedno připojení: Používá se pro jednoduché skripty, ve kterých je otevřeno, použito a uzavřeno jednoúčelové připojení k databázi.
  • Fond připojení: Používá se pro úlohy s vysokou souběžností, kde se udržuje fond opakovaně použitelných připojení.
  • Rotující token OAuth M2M: Používá krátkodobé, automaticky obnovované tokeny OAuth pro autentizaci.

Následující příklady generují zabezpečené přihlašovací údaje prostřednictvím kódu programu. Vyhněte se přímému vkládání přihlašovacích údajů do poznámkového bloku. Databricks doporučuje použít jednu z následujících zabezpečených metod:

Než začnete

Před přístupem k instanci databáze se ujistěte, že splňujete následující požadavky:

  • Máte odpovídající roli Postgres pro přihlášení k instanci databáze. Viz Správa rolí Postgres.
  • Vaše role Postgres má udělená potřebná oprávnění pro přístup k databázi, schématu nebo tabulce.
  • Můžete se ověřit u instance databáze. Pokud musíte token OAuth pro vaši instanci databáze získat ručně, přečtěte si téma Ověření instance databáze.

Výstraha

Pokud používáte privátní propojení, musíte použít jeden uživatelský cluster.

Python

Sadu Azure Databricks Python SDK je možné použít k získání tokenu OAuth pro příslušnou instanci databáze.

Připojte se k instanci databáze z poznámkového bloku Azure Databricks pomocí následujících knihoven Pythonu:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Požadavky

Před spuštěním následujících příkladů kódu upgradujte sadu Databricks SDK pro Python na verzi 0.61.0 nebo novější a restartujte Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Příklady kódu ukazují jedno připojení a využití poolu připojení. Další informace o tom, jak získat instanci databáze a přihlašovací údaje prostřednictvím kódu programu, najdete v tématu získání tokenu OAuth pomocí sady Python SDK.

Jedno připojení

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Fond připojení

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

Příklad kódu ukazuje použití fondu připojení s rotujícím M2M OAuth. Používá generate_database_credential(). Další informace o tom, jak získat instanci databáze a přihlašovací údaje prostřednictvím kódu programu, najdete v tématu získání tokenu OAuth pomocí sady Python SDK.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Příklady kódu ukazují jedno připojení a použití fondu připojení s rotujícím tokenem OAuth M2M. Další informace o tom, jak získat instanci databáze a přihlašovací údaje prostřednictvím kódu programu, najdete v tématu získání tokenu OAuth pomocí sady Python SDK.

Jedno připojení

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Fond připojení a obměně M2M OAuth

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Příklady kódu ukazují, jak programově získat instanci databáze a přihlašovací údaje a jak se připojit k instanci databáze pomocí jediného připojení nebo fondu připojení.

Krok 1: Získání tokenu OAuth pomocí sady Azure Databricks Java SDK

Podrobnosti o tom, jak získat instanci databáze a přihlašovací údaje prostřednictvím kódu programu, najdete v tématu získání tokenu OAuth pomocí sady Java SDK.

Krok 2: Připojení k instanci databáze

Jedno připojení

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Fond připojení

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()