Udostępnij przez


Uzyskiwanie dostępu do wystąpienia bazy danych za pomocą notesu

Ważne

Ta funkcja jest dostępna w wersji próbnej w następujących regionach: westus, westus2, eastus, eastus2, centralus, southcentralus, northeurope, westeurope, australiaeast, brazilsouth, canadacentral, centralindia, southeastasia, uksouth.

Ta strona zawiera przykłady kodu pokazujące, jak uzyskać dostęp do wystąpienia bazy danych Lakebase za pośrednictwem notesów usługi Azure Databricks i uruchamiać zapytania przy użyciu języka Python i języka Scala.

W przykładach omówiono różne strategie połączeń, aby dopasować je do różnych przypadków użycia:

  • Pojedyncze połączenie: służy do prostych skryptów, w których jest otwierane, używane i zamykane pojedyncze połączenie z bazą danych.
  • Pula połączeń: używana w przypadku obciążeń o wysokiej współbieżności, w których jest utrzymywana pula połączeń wielokrotnego użytku.
  • Obracanie tokenu OAuth M2M: używa krótkotrwałych, automatycznie odświeżonych tokenów OAuth na potrzeby uwierzytelniania.

W poniższych przykładach poświadczenia są generowane programowo w sposób bezpieczny. Unikaj bezpośredniego umieszczania poświadczeń w notesie. Usługa Databricks zaleca użycie jednej z następujących bezpiecznych metod:

  • Przechowuj hasła Postgres w tajemnicach Azure Databricks.
  • Generowanie tokenów OAuth przy użyciu protokołu OAuth M2M.

Zanim rozpoczniesz

Przed uzyskaniem dostępu do wystąpienia bazy danych upewnij się, że spełnisz następujące wymagania:

  • Masz odpowiednią rolę Postgres, aby zalogować się do instancji bazy danych. Zobacz Zarządzanie rolami postgresu.
  • Rola Postgres ma przyznane niezbędne uprawnienia dostępu do bazy danych, schematu lub tabeli.
  • Możesz uwierzytelnić się w wystąpieniu bazy danych. Jeśli musisz ręcznie uzyskać token OAuth dla wystąpienia bazy danych, zobacz Uwierzytelnianie w wystąpieniu bazy danych.

Ostrzeżenie

Jeśli używasz łącza prywatnego, musisz użyć klastra pojedynczego użytkownika.

Python

Zestaw SDK języka Python usługi Azure Databricks może służyć do uzyskania tokenu OAuth dla odpowiedniego wystąpienia bazy danych.

Połącz się z instancją bazy danych z notatnika Azure Databricks przy użyciu następujących bibliotek Pythona:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Wymagania wstępne

Przed uruchomieniem poniższych przykładów kodu uaktualnij zestaw SDK usługi Databricks dla języka Python do wersji 0.61.0 lub nowszej, a następnie uruchom ponownie język Python.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Przykłady kodu przedstawiają pojedyncze połączenie i użycie puli połączeń. Aby uzyskać więcej informacji na temat programowego uzyskiwania wystąpienia bazy danych i poświadczeń, zobacz jak uzyskać token OAuth przy użyciu zestawu SDK języka Python.

Pojedyncze połączenie

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Pula połączeń

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

W przykładzie kodu pokazano użycie puli połączeń z obrotowym uwierzytelnianiem OAuth M2M. Używa metody generate_database_credential(). Aby uzyskać więcej informacji na temat programowego uzyskiwania wystąpienia bazy danych i poświadczeń, zobacz jak uzyskać token OAuth przy użyciu zestawu SDK języka Python.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Przykłady kodu przedstawiają pojedyncze połączenie i użycie puli połączeń z obrotowym tokenem OAuth M2M. Aby uzyskać więcej informacji na temat programowego uzyskiwania wystąpienia bazy danych i poświadczeń, zobacz jak uzyskać token OAuth przy użyciu zestawu SDK języka Python.

Pojedyncze połączenie

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Pula połączeń i rotacja protokołu OAuth M2M

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Przykłady kodu pokazują, jak programowo uzyskać wystąpienie i poświadczenia bazy danych oraz jak nawiązać połączenie z wystąpieniem bazy danych przy użyciu jednego połączenia lub puli połączeń.

Krok 1. Uzyskiwanie tokenu OAuth przy użyciu zestawu SDK języka Java usługi Azure Databricks

Aby uzyskać szczegółowe informacje na temat programowego uzyskiwania instancji bazy danych i poświadczeń, zobacz jak uzyskać token OAuth przy użyciu Java SDK.

Krok 2: Połącz się z wystąpieniem bazy danych

Pojedyncze połączenie

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Pula połączeń

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()