다음을 통해 공유


Notebook을 사용하여 데이터베이스 인스턴스 액세스

중요합니다

이 기능은 다음 지역의 공개 미리 보기에 있습니다. westuswestus2eastuseastus2centralussouthcentralusnortheuropewesteuropeaustraliaeastbrazilsouthcanadacentralcentralindiasoutheastasiauksouth

이 페이지에는 Azure Databricks Notebook 을 통해 Lakebase 데이터베이스 인스턴스에 액세스하고 Python 및 Scala를 사용하여 쿼리를 실행하는 방법을 보여 주는 코드 예제가 포함되어 있습니다.

이 예제에서는 다양한 사용 사례에 맞게 다양한 연결 전략을 다룹니다.

  • 단일 연결: 단일 데이터베이스 연결을 열고 사용하고 닫는 간단한 스크립트에 사용됩니다.
  • 연결 풀: 다시 사용할 수 있는 연결 풀이 유지 관리되는 높은 동시성 워크로드에 사용됩니다.
  • M2M OAuth 토큰 회전: 인증을 위해 수명이 짧고 자동으로 새로 고쳐진 OAuth 토큰을 사용합니다.

다음 예제에서는 프로그래밍 방식으로 보안 자격 증명을 생성합니다. 전자 필기장에 자격 증명을 직접 입력하지 마세요. Databricks는 다음 보안 방법 중 하나를 사용하는 것이 좋습니다.

시작하기 전 주의 사항:

데이터베이스 인스턴스에 액세스하기 전에 다음 요구 사항을 충족하는지 확인합니다.

  • 데이터베이스 인스턴스에 로그인할 해당 Postgres 역할이 있습니다. Postgres 역할 관리를 참조하세요.
  • Postgres 역할에는 데이터베이스, 스키마 또는 테이블에 액세스하는 데 필요한 권한이 부여됩니다.
  • 데이터베이스 인스턴스에 인증할 수 있습니다. 데이터베이스 인스턴스에 대한 OAuth 토큰을 수동으로 가져와야 하는 경우 데이터베이스 인스턴스에 대한 인증을 참조하세요.

경고

프라이빗 링크를 사용하는 경우 단일 사용자 클러스터를 사용해야 합니다.

파이썬

Azure Databricks Python SDK를 사용하여 해당 데이터베이스 인스턴스에 대한 OAuth 토큰을 가져올 수 있습니다.

다음 Python 라이브러리를 사용하여 Azure Databricks Notebook에서 데이터베이스 인스턴스에 연결합니다.

  • psycopg2
  • psycopg3
  • SQLAlchemy

필수 조건

다음 코드 예제를 실행하기 전에 Python용 Databricks SDK를 버전 0.61.0 이상으로 업그레이드한 다음 Python을 다시 시작합니다.

%pip install databricks-sdk>=0.61.0
%restart_python

psycopg2

코드 예제에서는 단일 연결 및 연결 풀의 사용을 보여 줍니다. 프로그래밍 방식으로 데이터베이스 인스턴스 및 자격 증명을 가져오는 방법에 대한 자세한 내용은 Python SDK를 사용하여 OAuth 토큰을 가져오는 방법을 참조하세요.

단일 연결

import psycopg2

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Connection parameters
conn = psycopg2.connect(
    host = instance.read_write_dns,
    dbname = "databricks_postgres",
    user = "<YOUR USER>",
    password = cred.token,
    sslmode = "require"
)

# Execute query
with conn.cursor() as cur:
    cur.execute("SELECT version()")
    version = cur.fetchone()[0]
    print(version)
conn.close()

연결 풀

import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 1,  # Minimum number of connections in the pool
    maxconn = 10,  # Maximum number of connections in the pool
    user = "<YOUR USER>",
    password = cred.token,
    host = instance.read_write_dns,
    port = '5432',
    database = 'databricks_postgres'
)
if connection_pool:
    print("Connection pool created successfully")


def executeWithPgConnection(execFn):
    connection = None
    try:
        # Get a connection from the pool
        connection = connection_pool.getconn()
        if connection:
            print("Successfully received a connection from the pool")
            execFn(connection)
    finally:
        # Release the connection back to the pool
        if connection:
            connection_pool.putconn(connection)
            print("Connection returned to the pool")


def printVersion(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT version()")
    version = cursor.fetchone()
    print(f"Connected to PostgreSQL database. Version: {version}")

executeWithPgConnection(printVersion)

psycopg3

이 코드 예제에서는 회전하는 M2M OAuth를 사용하여 연결 풀을 사용하는 방법을 보여 줍니다. 를 사용합니다 generate_database_credential(). 프로그래밍 방식으로 데이터베이스 인스턴스 및 자격 증명을 가져오는 방법에 대한 자세한 내용은 Python SDK를 사용하여 OAuth 토큰을 가져오는 방법을 참조하세요.

%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid

import psycopg
import string
from psycopg_pool import ConnectionPool

w = WorkspaceClient()

class CustomConnection(psycopg.Connection):
    global w
    def __init__(self, *args, **kwargs):
        # Call the parent class constructor
        super().__init__(*args, **kwargs)

    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Append the new password to kwargs
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        kwargs['password'] = cred.token

        # Call the superclass's connect method with updated kwargs
        return super().connect(conninfo, **kwargs)


username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host}",
    connection_class=CustomConnection,
    min_size=1,
    max_size=10,
    open=True
)

with pool.connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute("SELECT version()")
        for record in cursor:
            print(record)

SQLAlchemy

코드 예제에서는 단일 연결 및 회전 M2M OAuth 토큰을 사용하여 연결 풀을 사용하는 방법을 보여 줍니다. 프로그래밍 방식으로 데이터베이스 인스턴스 및 자격 증명을 가져오는 방법에 대한 자세한 내용은 Python SDK를 사용하여 OAuth 토큰을 가져오는 방법을 참조하세요.

단일 연결

%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text

from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token

connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

연결 풀 및 순환 M2M OAuth

%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time

from sqlalchemy import create_engine, text, event

w = WorkspaceClient()

instance_name = "<YOUR INSTANCE>"

instance = w.database.get_database_instance(name=instance_name)

username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"

# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()

@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    global postgres_password, last_password_refresh, host

    if postgres_password is None or time.time() - last_password_refresh > 900:
        print("Refreshing PostgreSQL OAuth token")
        cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
        postgres_password = cred.token
        last_password_refresh = time.time()

    cparams["password"] = postgres_password

with connection_pool.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    for row in result:
        print(f"Connected to PostgreSQL database. Version: {row}")

Scala

코드 예제에서는 프로그래밍 방식으로 데이터베이스 인스턴스 및 자격 증명을 가져오는 방법과 단일 연결 또는 연결 풀을 사용하여 데이터베이스 인스턴스에 연결하는 방법을 보여 줍니다.

1단계: Azure Databricks Java SDK를 사용하여 OAuth 토큰 가져오기

프로그래밍 방식으로 데이터베이스 인스턴스 및 자격 증명을 가져오는 방법에 대한 자세한 내용은 Java SDK를 사용하여 OAuth 토큰을 가져오는 방법을 참조하세요.

2단계: 데이터베이스 인스턴스에 연결

단일 연결

import java.sql.{Connection, DriverManager, ResultSet, Statement}

Class.forName("org.postgresql.Driver")

val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()

val url = f"jdbc:postgresql://${host}:${port}/${database}"

val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")

val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")

if (resultSet.next()) {
    val version = resultSet.getString(1)
    println(s"PostgreSQL version: $version")
}

연결 풀

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection

// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)

// Create a data source
val dataSource = new HikariDataSource(config)

// Function to get a connection and execute a query
def runQuery(): Unit = {
  var connection: Connection = null
  try {
    // Get a connection from the pool
    connection = dataSource.getConnection()

    // Create a statement
    val statement = connection.createStatement()

    // Execute a query
    val resultSet = statement.executeQuery("SELECT version() AS v;")

    // Process the result set
    while (resultSet.next()) {
      val v = resultSet.getString("v")
      println(s"*******Connected to PostgreSQL database. Version: $v")
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    // Close the connection which returns it to the pool
    if (connection != null) connection.close()
  }
}

// Run the query
runQuery()

// Close the data source
dataSource.close()