Aracılığıyla paylaş


Veritabanı örneğine erişmek için not defteri kullanma

Önemli

Bu özellik şu bölgelerde Genel Önizleme aşamasındadır: westus, westus2, eastus, , eastus2, centralus, , southcentralus, northeurope, westeuropeaustraliaeast, brazilsouth, canadacentral, centralindia, southeastasia, . uksouth

Bu sayfa, Azure Databricks not defterleri aracılığıyla Lakebase veritabanı örneğinize erişmeyi ve Python ve Scala kullanarak sorgu çalıştırmayı gösteren kod örnekleri içerir.

Örnekler, farklı kullanım örneklerine uyacak farklı bağlantı stratejilerini kapsar:

  • Tek bağlantı: Tek bir veritabanı bağlantısının açıldığı, kullanıldığı ve kapatıldığı basit betikler için kullanılır.
  • Bağlantı havuzu: Yeniden kullanılabilir bağlantı havuzunun korunduğu yüksek eşzamanlılık iş yükleri için kullanılır.
  • M2M OAuth belirtecini döndürme: Kimlik doğrulaması için kısa ömürlü, otomatik olarak yenilenen OAuth belirteçlerini kullanır.

Aşağıdaki örnekler program aracılığıyla güvenli kimlik bilgileri oluşturur. Kimlik bilgilerini doğrudan not defterine yerleştirmekten kaçının. Databricks aşağıdaki güvenli yöntemlerden birinin kullanılmasını önerir:

Başlamadan önce

Veritabanı örneğinize erişmeden önce aşağıdaki gereksinimleri karşıladığınızdan emin olun:

  • Veritabanı örneğinde oturum açmak için karşılık gelen bir Postgres rolünüz var. Bkz. Postgres rollerini yönetme.
  • Postgres rolünüz veritabanına, şemaya veya tabloya erişmek için gerekli izinlere sahip.
  • Veritabanı örneğinde kimlik doğrulaması yapabilirsiniz. Veritabanı örneğiniz için el ile bir OAuth belirteci almanız gerekiyorsa bkz. Veritabanı örneğinde kimlik doğrulaması.

Uyarı

Özel bağlantı kullanıyorsanız tek bir kullanıcı kümesi kullanmanız gerekir.

Piton

Azure Databricks Python SDK'sı, ilgili veritabanı örneği için OAuth belirteci almak için kullanılabilir.

Aşağıdaki Python kitaplıklarını kullanarak bir Azure Databricks not defterinden veritabanı örneğine bağlanın:

  • psycopg2
  • psycopg3
  • SQLAlchemy

Önkoşullar

Aşağıdaki kod örneklerini çalıştırmadan önce, Python için Databricks SDK'sını 0.61.0 veya üzeri bir sürüme yükseltin, ardından Python'ı yeniden başlatın.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

Kod örnekleri tek bir bağlantıyı ve bağlantı havuzunun kullanımını gösterir. Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla alma hakkında daha fazla bilgi için bkz. Python SDK'sını kullanarak OAuth belirtecini alma.

Tek bağlantı

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

Bağlantı havuzu

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

Kod örneği, dönen bir M2M OAuth ile bağlantı havuzunun kullanımını gösterir. Bu, generate_database_credential() kullanır. Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla alma hakkında daha fazla bilgi için bkz. Python SDK'sını kullanarak OAuth belirtecini alma.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

Kod örnekleri tek bir bağlantıyı ve dönen bir M2M OAuth belirteciyle bağlantı havuzu kullanımını gösterir. Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla alma hakkında daha fazla bilgi için bkz. Python SDK'sını kullanarak OAuth belirtecini alma.

Tek bağlantı

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Bağlantı havuzu & M2M OAuth'u döndürme

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

Kod örnekleri, veritabanı örneğinin ve kimlik bilgilerinin program aracılığıyla nasıl alınılacağını ve tek bir bağlantı veya bağlantı havuzu kullanarak bir veritabanı örneğine nasıl bağlanılacağını gösterir.

1. Adım: OAuth belirteci almak için Azure Databricks Java SDK'sını kullanma

Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla edinme hakkında ayrıntılı bilgi için bkz. Java SDK'sını kullanarak OAuth belirteci alma.

2. Adım: Veritabanı örneğine bağlanma

Tek bağlantı

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

Bağlantı havuzu

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()