Freigeben über


Verwenden eines Notizbuchs für den Zugriff auf eine Datenbankinstanz

Von Bedeutung

Dieses Feature befindet sich in der öffentlichen Vorschau in den folgenden Regionen: westus, westus2, eastus, eastus2, centralus, southcentralus, northeurope, westeurope, australiaeast, brazilsouth, canadacentral, centralindia, southeastasia, uksouth.

Diese Seite enthält Codebeispiele, die zeigen, wie Sie über Azure Databricks-Notizbücher auf Ihre Lakebase-Datenbankinstanz zugreifen und Abfragen mit Python und Scala ausführen können.

In den Beispielen werden verschiedene Verbindungsstrategien für unterschiedliche Anwendungsfälle behandelt:

  • Einzelne Verbindung: Wird für einfache Skripts verwendet, bei denen eine einzelne Datenbankverbindung geöffnet, verwendet und geschlossen wird.
  • Verbindungspool: Wird für Workloads mit hoher Parallelität verwendet, bei denen ein Pool wiederverwendbarer Verbindungen verwaltet wird.
  • Erneuerung von M2M OAuth-Token: Verwendet kurzlebige, automatisch aktualisierte OAuth-Token zur Authentifizierung.

Die folgenden Beispiele generieren programmgesteuert sichere Anmeldeinformationen. Vermeiden Sie das direkte Einfügen von Anmeldeinformationen in ein Notizbuch. Databricks empfiehlt die Verwendung einer der folgenden sicheren Methoden:

  • Speichern Sie Postgres-Kennwörter in Azure Databricks-Geheimnissen.
  • Generieren Sie OAuth-Token mit M2M OAuth.

Bevor Sie anfangen

Stellen Sie sicher, dass Sie die folgenden Anforderungen erfüllen, bevor Sie auf Ihre Datenbankinstanz zugreifen:

  • Sie verfügen über eine entsprechende Postgres-Rolle, um sich bei der Datenbankinstanz anzumelden. Siehe Verwalten von Postgres-Rollen.
  • Ihre Postgres-Rolle erhält die erforderlichen Berechtigungen für den Zugriff auf die Datenbank, das Schema oder die Tabelle.
  • Sie können sich bei der Datenbankinstanz authentifizieren. Wenn Sie ein OAuth-Token für Ihre Datenbankinstanz manuell abrufen müssen, lesen Sie " Authentifizieren bei einer Datenbankinstanz".

Warnung

Wenn Sie einen privaten Link verwenden, müssen Sie einen einzelnen Benutzercluster verwenden.

Python

Das Azure Databricks Python SDK kann verwendet werden, um ein OAuth-Token für eine entsprechende Datenbankinstanz abzurufen.

Stellen Sie mithilfe der folgenden Python-Bibliotheken eine Verbindung mit Ihrer Datenbankinstanz aus einem Azure Databricks-Notizbuch her:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Voraussetzungen

Bevor Sie die folgenden Codebeispiele ausführen, aktualisieren Sie das Databricks SDK für Python auf Version 0.61.0 oder höher, und starten Sie Dann Python neu.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Die Codebeispiele veranschaulichen eine einzelne Verbindung und die Verwendung eines Verbindungspools. Weitere Informationen zum programmgesteuerten Abrufen der Datenbankinstanz und Anmeldeinformationen finden Sie unter Verwendung des Python SDK zum Abrufen eines OAuth-Tokens.

Einzelne Verbindung

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Verbindungspool

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

Das Codebeispiel veranschaulicht die Verwendung eines Verbindungspools mit einem sich drehenden M2M OAuth. Es verwendet generate_database_credential(). Weitere Informationen zum programmgesteuerten Abrufen der Datenbankinstanz und Anmeldeinformationen finden Sie unter Verwendung des Python SDK zum Abrufen eines OAuth-Tokens.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Die Codebeispiele veranschaulichen eine einzelne Verbindung und die Verwendung eines Verbindungspools mit einem sich drehenden M2M OAuth-Token. Weitere Informationen zum programmgesteuerten Abrufen der Datenbankinstanz und Anmeldeinformationen finden Sie unter Verwendung des Python SDK zum Abrufen eines OAuth-Tokens.

Einzelne Verbindung

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Verbindungspool und wechselnde M2M OAuth

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Die Codebeispiele zeigen, wie Sie die Datenbankinstanz und Anmeldeinformationen programmgesteuert abrufen und wie Sie mithilfe einer einzelnen Verbindung oder eines Verbindungspools eine Verbindung mit einer Datenbankinstanz herstellen.

Schritt 1: Verwenden des Java SDK für Azure Databricks zum Abrufen eines OAuth-Tokens

Ausführliche Informationen zum programmgesteuerten Abrufen der Datenbankinstanz und Anmeldeinformationen finden Sie unter Verwendung des Java SDK zum Abrufen eines OAuth-Tokens.

Schritt 2: Herstellen einer Verbindung mit einer Datenbankinstanz

Einzelne Verbindung

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Verbindungspool

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()