Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Önemli
Bu özellik şu bölgelerde Genel Önizleme aşamasındadır: westus, westus2, eastus, , eastus2, centralus, , southcentralus, northeurope, westeuropeaustraliaeast, brazilsouth, canadacentral, centralindia, southeastasia, . uksouth
Bu sayfa, Azure Databricks not defterleri aracılığıyla Lakebase veritabanı örneğinize erişmeyi ve Python ve Scala kullanarak sorgu çalıştırmayı gösteren kod örnekleri içerir.
Örnekler, farklı kullanım örneklerine uyacak farklı bağlantı stratejilerini kapsar:
- Tek bağlantı: Tek bir veritabanı bağlantısının açıldığı, kullanıldığı ve kapatıldığı basit betikler için kullanılır.
- Bağlantı havuzu: Yeniden kullanılabilir bağlantı havuzunun korunduğu yüksek eşzamanlılık iş yükleri için kullanılır.
- M2M OAuth belirtecini döndürme: Kimlik doğrulaması için kısa ömürlü, otomatik olarak yenilenen OAuth belirteçlerini kullanır.
Aşağıdaki örnekler program aracılığıyla güvenli kimlik bilgileri oluşturur. Kimlik bilgilerini doğrudan not defterine yerleştirmekten kaçının. Databricks aşağıdaki güvenli yöntemlerden birinin kullanılmasını önerir:
- Postgres parolalarını Azure Databricks gizli dizilerinde depolayın.
- M2M OAuth kullanarak OAuth belirteçleri oluşturun.
Başlamadan önce
Veritabanı örneğinize erişmeden önce aşağıdaki gereksinimleri karşıladığınızdan emin olun:
- Veritabanı örneğinde oturum açmak için karşılık gelen bir Postgres rolünüz var. Bkz. Postgres rollerini yönetme.
- Postgres rolünüz veritabanına, şemaya veya tabloya erişmek için gerekli izinlere sahip.
- Veritabanı örneğinde kimlik doğrulaması yapabilirsiniz. Veritabanı örneğiniz için el ile bir OAuth belirteci almanız gerekiyorsa bkz. Veritabanı örneğinde kimlik doğrulaması.
Uyarı
Özel bağlantı kullanıyorsanız tek bir kullanıcı kümesi kullanmanız gerekir.
Piton
Azure Databricks Python SDK'sı, ilgili veritabanı örneği için OAuth belirteci almak için kullanılabilir.
Aşağıdaki Python kitaplıklarını kullanarak bir Azure Databricks not defterinden veritabanı örneğine bağlanın:
psycopg2psycopg3SQLAlchemy
Önkoşullar
Aşağıdaki kod örneklerini çalıştırmadan önce, Python için Databricks SDK'sını 0.61.0 veya üzeri bir sürüme yükseltin, ardından Python'ı yeniden başlatın.
%pip install databricks-sdk>=0.61.0
%restart_python
psycopg2
Kod örnekleri tek bir bağlantıyı ve bağlantı havuzunun kullanımını gösterir. Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla alma hakkında daha fazla bilgi için bkz. Python SDK'sını kullanarak OAuth belirtecini alma.
Tek bağlantı
import psycopg2
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Connection parameters
conn = psycopg2.connect(
host = instance.read_write_dns,
dbname = "databricks_postgres",
user = "<YOUR USER>",
password = cred.token,
sslmode = "require"
)
# Execute query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
print(version)
conn.close()
Bağlantı havuzu
import psycopg2
from psycopg2 import sql, pool
from pyspark.sql.functions import udf
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
# Create a connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 1, # Minimum number of connections in the pool
maxconn = 10, # Maximum number of connections in the pool
user = "<YOUR USER>",
password = cred.token,
host = instance.read_write_dns,
port = '5432',
database = 'databricks_postgres'
)
if connection_pool:
print("Connection pool created successfully")
def executeWithPgConnection(execFn):
connection = None
try:
# Get a connection from the pool
connection = connection_pool.getconn()
if connection:
print("Successfully received a connection from the pool")
execFn(connection)
finally:
# Release the connection back to the pool
if connection:
connection_pool.putconn(connection)
print("Connection returned to the pool")
def printVersion(connection):
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()
print(f"Connected to PostgreSQL database. Version: {version}")
executeWithPgConnection(printVersion)
psycopg3
Kod örneği, dönen bir M2M OAuth ile bağlantı havuzunun kullanımını gösterir. Bu, generate_database_credential() kullanır. Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla alma hakkında daha fazla bilgi için bkz. Python SDK'sını kullanarak OAuth belirtecini alma.
%pip install "psycopg[binary,pool]"
from databricks.sdk import WorkspaceClient
import uuid
import psycopg
import string
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class CustomConnection(psycopg.Connection):
global w
def __init__(self, *args, **kwargs):
# Call the parent class constructor
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, conninfo='', **kwargs):
# Append the new password to kwargs
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
kwargs['password'] = cred.token
# Call the superclass's connect method with updated kwargs
return super().connect(conninfo, **kwargs)
username = "<YOUR USER>"
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
pool = ConnectionPool(
conninfo=f"dbname={database} user={username} host={host}",
connection_class=CustomConnection,
min_size=1,
max_size=10,
open=True
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT version()")
for record in cursor:
print(record)
SQLAlchemy
Kod örnekleri tek bir bağlantıyı ve dönen bir M2M OAuth belirteciyle bağlantı havuzu kullanımını gösterir. Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla alma hakkında daha fazla bilgi için bkz. Python SDK'sını kullanarak OAuth belirtecini alma.
Tek bağlantı
%pip install sqlalchemy==1.4 psycopg[binary]
from sqlalchemy import create_engine, text
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
user = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
password = cred.token
connection_pool = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode=require")
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Bağlantı havuzu & M2M OAuth'u döndürme
%pip install sqlalchemy==1.4 psycopg[binary]
from databricks.sdk import WorkspaceClient
import uuid
import time
from sqlalchemy import create_engine, text, event
w = WorkspaceClient()
instance_name = "<YOUR INSTANCE>"
instance = w.database.get_database_instance(name=instance_name)
username = "<YOUR USER>"
host = instance.read_write_dns
port = 5432
database = "databricks_postgres"
# sqlalchemy setup + function to refresh the OAuth token that is used as the Postgres password every 15 minutes.
connection_pool = create_engine(f"postgresql+psycopg2://{username}:@{host}:{port}/{database}")
postgres_password = None
last_password_refresh = time.time()
@event.listens_for(connection_pool, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password, last_password_refresh, host
if postgres_password is None or time.time() - last_password_refresh > 900:
print("Refreshing PostgreSQL OAuth token")
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])
postgres_password = cred.token
last_password_refresh = time.time()
cparams["password"] = postgres_password
with connection_pool.connect() as conn:
result = conn.execute(text("SELECT version()"))
for row in result:
print(f"Connected to PostgreSQL database. Version: {row}")
Scala
Kod örnekleri, veritabanı örneğinin ve kimlik bilgilerinin program aracılığıyla nasıl alınılacağını ve tek bir bağlantı veya bağlantı havuzu kullanarak bir veritabanı örneğine nasıl bağlanılacağını gösterir.
1. Adım: OAuth belirteci almak için Azure Databricks Java SDK'sını kullanma
Veritabanı örneğini ve kimlik bilgilerini program aracılığıyla edinme hakkında ayrıntılı bilgi için bkz. Java SDK'sını kullanarak OAuth belirteci alma.
2. Adım: Veritabanı örneğine bağlanma
Tek bağlantı
import java.sql.{Connection, DriverManager, ResultSet, Statement}
Class.forName("org.postgresql.Driver")
val user = "<YOUR USER>"
val host = instance.getName()
val port = "5432"
val database = "databricks_postgres"
val password = cred.getToken()
val url = f"jdbc:postgresql://${host}:${port}/${database}"
val connection = DriverManager.getConnection(url, user, password)
println("Connected to PostgreSQL database!")
val statement = connection.createStatement()
val resultSet = statement.executeQuery("SELECT version()")
if (resultSet.next()) {
val version = resultSet.getString(1)
println(s"PostgreSQL version: $version")
}
Bağlantı havuzu
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import java.sql.Connection
// Configure HikariCP
val config = new HikariConfig()
config.setJdbcUrl("jdbc:postgresql://instance.getName():5432/databricks_postgres")
config.setUsername("<YOUR USER>")
config.setPassword(cred.getToken())
config.setMaximumPoolSize(10)
// Create a data source
val dataSource = new HikariDataSource(config)
// Function to get a connection and execute a query
def runQuery(): Unit = {
var connection: Connection = null
try {
// Get a connection from the pool
connection = dataSource.getConnection()
// Create a statement
val statement = connection.createStatement()
// Execute a query
val resultSet = statement.executeQuery("SELECT version() AS v;")
// Process the result set
while (resultSet.next()) {
val v = resultSet.getString("v")
println(s"*******Connected to PostgreSQL database. Version: $v")
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
// Close the connection which returns it to the pool
if (connection != null) connection.close()
}
}
// Run the query
runQuery()
// Close the data source
dataSource.close()