Подключение внешнего приложения к Lakebase с помощью пакета SDK

В этом руководстве показано, как подключить внешние приложения к Автомасштабированию Lakebase с помощью стандартных драйверов Postgres (psycopg, pgx, JDBC) с поворотом маркера OAuth. Пакет SDK Azure Databricks используется со служебным главным идентификатором и пулом подключений, который вызывает generate_database_credential() при открытии каждого нового подключения, поэтому при каждом подключении вы получаете новый токен (время существования 60 минут). Примеры приведены для Python, Java и Go. Чтобы упростить настройку с помощью автоматического управления учетными данными, вместо этого рассмотрим Azure Databricks Apps .

Что вы создадите: Шаблон подключения, использующий ротацию токенов OAuth для подключения к масштабированию Lakebase из внешнего приложения, а затем убедитесь в том, что подключение работает.

Вам нужен пакет SDK Databricks (Python v0.89.0+, Java v0.73.0+ или Go v0.109.0+). Последовательно выполните следующие шаги.

:::tip Другие языки Для языков без поддержки SDK Databricks (Node.js, Ruby, PHP, Elixir, Rust и т. д.), см. статью "Подключение внешнего приложения к Lakebase с помощью API". :::

Принцип работы

Пакет SDK Databricks упрощает проверку подлинности OAuth, обрабатывая управление маркерами рабочей области автоматически:

Поток OAuth пакета SDK

Ваше приложение вызывает generate_database_credential() с параметром конечной точки. Пакет SDK получает маркер OAuth рабочей области (без необходимости написания кода), запрашивает учетные данные базы данных из API Lakebase и возвращает их вашему приложению. Затем вы используете эти учетные данные в качестве пароля при подключении к Postgres.

Срок действия маркера OAuth для рабочего пространства и учетных данных базы данных истекает через 60 минут. Пулы подключений обрабатывают автоматическое обновление путем вызова generate_database_credential() при создании новых подключений.

1. Создание субъекта-службы с помощью секрета OAuth

Создайте учетную запись службы Azure Databricks с секретным кодом OAuth. Полные сведения см. в разделе "Авторизация доступа субъекта-службы". Для создания внешнего приложения помните:

  • Задайте срок действия вашего ключа до 730 дней. Это определяет, как часто необходимо обновлять секрет, который используется для создания учетных данных базы данных посредством ротации.
  • Включите "Доступ к рабочей области" для субъекта-службы (параметры → удостоверение и доступ → субъекты-службы → {name} вкладка "Конфигурации →"). Для создания новых учетных данных для базы данных это требуется.
  • Обратите внимание на идентификатор клиента (UUID). Вы используете его при создании соответствующей роли Postgres в настройках вашего приложения и для PGUSER.

2. Создание роли Postgres для субъекта-службы

Создайте роль OAuth для субъекта-службы. Это можно сделать в пользовательском интерфейсе Lakebase (с помощью вкладки OAuth диалогового окна "Добавление роли") или в редакторе SQL Lakebase с помощью идентификатора клиента из шага 1 (а не отображаемого имени; имя роли учитывает регистр):

-- Enable the auth extension (if not already enabled)
CREATE EXTENSION IF NOT EXISTS databricks_auth;

-- Create OAuth role using the service principal client ID
SELECT databricks_create_role('{client-id}', 'SERVICE_PRINCIPAL');

-- Grant database permissions
GRANT CONNECT ON DATABASE databricks_postgres TO "{client-id}";
GRANT USAGE ON SCHEMA public TO "{client-id}";
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO "{client-id}";
ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO "{client-id}";

Замените {client-id} идентификатором клиента сервисного принципала. См. статью "Создание ролей OAuth".

3. Получение сведений о подключении

В вашем проекте в консоли Lakebase нажмите Подключиться, выберите ветвь и конечную точку, и запишите узел, базу данных (обычно databricks_postgres) и имя конечной точки (формат: projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>).

Или используйте интерфейс командной строки:

databricks postgres list-endpoints projects/<project-id>/branches/<branch-id>

См. строки подключения для получения дополнительных сведений.

4. Задание переменных среды

Задайте эти переменные среды перед запуском приложения:

# Databricks workspace authentication
export DATABRICKS_HOST="https://your-workspace.databricks.com"
export DATABRICKS_CLIENT_ID="<service-principal-client-id>"
export DATABRICKS_CLIENT_SECRET="<your-oauth-secret>"

# Lakebase connection details (from step 3)
export ENDPOINT_NAME="projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>"
export PGHOST="<endpoint-id>.database.<region>.cloud.databricks.com"
export PGDATABASE="databricks_postgres"
export PGUSER="<service-principal-client-id>"   # Same UUID as step 1
export PGPORT="5432"
export PGSSLMODE="require"   # Python only

5. Добавление кода подключения

Питон

В этом примере используется psycopg3 с пользовательским классом подключения, который генерирует свежий токен при создании каждого нового подключения.

import os
from databricks.sdk import WorkspaceClient
import psycopg
from psycopg_pool import ConnectionPool

# Initialize Databricks SDK
workspace_client = None

def _get_workspace_client():
    """Get or create the workspace client for OAuth."""
    global workspace_client
    if workspace_client is None:
        workspace_client = WorkspaceClient(
            host=os.environ["DATABRICKS_HOST"],
            client_id=os.environ["DATABRICKS_CLIENT_ID"],
            client_secret=os.environ["DATABRICKS_CLIENT_SECRET"],
        )
    return workspace_client

def _get_endpoint_name():
    """Get endpoint name from environment."""
    name = os.environ.get("ENDPOINT_NAME")
    if not name:
        raise ValueError(
            "ENDPOINT_NAME must be set (format: projects/<id>/branches/<id>/endpoints/<id>)"
        )
    return name

class OAuthConnection(psycopg.Connection):
    """Custom connection class that generates a fresh OAuth token per connection."""

    @classmethod
    def connect(cls, conninfo="", **kwargs):
        endpoint_name = _get_endpoint_name()
        client = _get_workspace_client()
        # Generate database credential (tokens are workspace-scoped)
        credential = client.postgres.generate_database_credential(
            endpoint=endpoint_name
        )
        kwargs["password"] = credential.token
        return super().connect(conninfo, **kwargs)

# Create connection pool with OAuth token rotation
def get_connection_pool():
    """Get or create the connection pool."""
    database = os.environ["PGDATABASE"]
    user = os.environ["PGUSER"]
    host = os.environ["PGHOST"]
    port = os.environ.get("PGPORT", "5432")
    sslmode = os.environ.get("PGSSLMODE", "require")

    conninfo = f"dbname={database} user={user} host={host} port={port} sslmode={sslmode}"

    return ConnectionPool(
        conninfo=conninfo,
        connection_class=OAuthConnection,
        min_size=1,
        max_size=10,
        open=True,
    )

# Use the pool in your application
pool = get_connection_pool()
with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT current_user, current_database()")
        print(cur.fetchone())

Зависимости:databricks-sdk>=0.89.0, psycopg[binary,pool]>=3.1.0

Go

В этом примере используется pgxpool с обратным вызовом BeforeConnect, который создает новый токен для каждого нового подключения.

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/databricks/databricks-sdk-go"
	"github.com/databricks/databricks-sdk-go/service/postgres"
	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
)

func createConnectionPool(ctx context.Context) (*pgxpool.Pool, error) {
	// Initialize Databricks workspace client
	w, err := databricks.NewWorkspaceClient(&databricks.Config{
		Host:         os.Getenv("DATABRICKS_HOST"),
		ClientID:     os.Getenv("DATABRICKS_CLIENT_ID"),
		ClientSecret: os.Getenv("DATABRICKS_CLIENT_SECRET"),
	})
	if err != nil {
		return nil, err
	}

	// Build connection string
	connStr := fmt.Sprintf("host=%s port=%s dbname=%s user=%s sslmode=require",
		os.Getenv("PGHOST"),
		os.Getenv("PGPORT"),
		os.Getenv("PGDATABASE"),
		os.Getenv("PGUSER"))

	config, err := pgxpool.ParseConfig(connStr)
	if err != nil {
		return nil, err
	}

	// Configure pool
	config.MaxConns = 10
	config.MinConns = 1
	config.MaxConnLifetime = 45 * time.Minute
	config.MaxConnIdleTime = 15 * time.Minute

	// Generate fresh token for each new connection
	config.BeforeConnect = func(ctx context.Context, connConfig *pgx.ConnConfig) error {
		credential, err := w.Postgres.GenerateDatabaseCredential(ctx,
			postgres.GenerateDatabaseCredentialRequest{
				Endpoint: os.Getenv("ENDPOINT_NAME"),
			})
		if err != nil {
			return err
		}
		connConfig.Password = credential.Token
		return nil
	}

	return pgxpool.NewWithConfig(ctx, config)
}

func main() {
	ctx := context.Background()
	pool, err := createConnectionPool(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer pool.Close()

	var user, database string
	err = pool.QueryRow(ctx, "SELECT current_user, current_database()").Scan(&user, &database)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Connected as: %s to database: %s\n", user, database)
}

Зависимости: Пакет SDK Databricks для Go v0.109.0+ (github.com/databricks/databricks-sdk-go), драйвер pgx (github.com/jackc/pgx/v5)

Примечание: Обратный BeforeConnect вызов обеспечивает новые маркеры OAuth для каждого нового подключения, обрабатывая автоматическую смену маркеров для длительных приложений.

Ява

В этом примере используется JDBC с HikariCP и настраиваемый DataSource, который генерирует новый токен при создании пулом каждого нового подключения.

import java.sql.*;
import javax.sql.DataSource;
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.core.DatabricksConfig;
import com.databricks.sdk.service.postgres.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class LakebaseConnection {

    private static WorkspaceClient workspaceClient() {
        String host = System.getenv("DATABRICKS_HOST");
        String clientId = System.getenv("DATABRICKS_CLIENT_ID");
        String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET");

        return new WorkspaceClient(new DatabricksConfig()
            .setHost(host)
            .setClientId(clientId)
            .setClientSecret(clientSecret));
    }

    private static DataSource createDataSource() {
        WorkspaceClient w = workspaceClient();
        String endpointName = System.getenv("ENDPOINT_NAME");
        String host = System.getenv("PGHOST");
        String database = System.getenv("PGDATABASE");
        String user = System.getenv("PGUSER");
        String port = System.getenv().getOrDefault("PGPORT", "5432");

        String jdbcUrl = "jdbc:postgresql://" + host + ":" + port +
                         "/" + database + "?sslmode=require";

        // DataSource that returns a new connection with a fresh token (tokens are workspace-scoped)
        DataSource tokenDataSource = new DataSource() {
            @Override
            public Connection getConnection() throws SQLException {
                DatabaseCredential cred = w.postgres().generateDatabaseCredential(
                    new GenerateDatabaseCredentialRequest().setEndpoint(endpointName)
                );
                return DriverManager.getConnection(jdbcUrl, user, cred.getToken());
            }

            @Override
            public Connection getConnection(String u, String p) {
                throw new UnsupportedOperationException();
            }
            // ... other DataSource methods (getLogWriter, etc.)
        };

        // Wrap in HikariCP for connection pooling
        HikariConfig config = new HikariConfig();
        config.setDataSource(tokenDataSource);
        config.setMaximumPoolSize(10);
        config.setMinimumIdle(1);
        // Recycle connections before 60-min token expiry
        config.setMaxLifetime(45 * 60 * 1000L);

        return new HikariDataSource(config);
    }

    public static void main(String[] args) throws SQLException {
        DataSource pool = createDataSource();

        try (Connection conn = pool.getConnection();
             Statement st = conn.createStatement();
             ResultSet rs = st.executeQuery("SELECT current_user, current_database()")) {
            if (rs.next()) {
                System.out.println("User: " + rs.getString(1));
                System.out.println("Database: " + rs.getString(2));
            }
        }
    }
}

Зависимости: Пакет SDK Databricks для Java v0.73.0+ (com.databricks:databricks-sdk-java), драйвер JDBC PostgreSQL (), HikariCP (org.postgresql:postgresqlcom.zaxxer:HikariCP)

6. Запуск и проверка подключения

Питон

Установите зависимости:

pip install databricks-sdk psycopg[binary,pool]

Запустить:

# Save all the code from step 5 (above) as db.py, then run:
from db import get_connection_pool

pool = get_connection_pool()
with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT current_user, current_database()")
        print(cur.fetchone())

Ожидаемые выходные данные:

('c00f575e-d706-4f6b-b62c-e7a14850571b', 'databricks_postgres')

Если current_user соответствует идентификатору клиента служебного принципала с шага 1, обновление токена OAuth работает.

Ява

Примечание: Предполагается, что у вас есть проект Maven с зависимостями из приведенного выше примера Java.pom.xml

Установите зависимости:

mvn install

Запустить:

mvn exec:java -Dexec.mainClass="com.example.LakebaseConnection"

Ожидаемые выходные данные:

User: c00f575e-d706-4f6b-b62c-e7a14850571b
Database: databricks_postgres

Если пользователь соответствует идентификатору клиента основного компонента службы из шага 1, обновление токена OAuth работает.

Go

Установите зависимости:

go mod init myapp
go get github.com/databricks/databricks-sdk-go
go get github.com/jackc/pgx/v5

Запустить:

go run main.go

Ожидаемые выходные данные:

Connected as: c00f575e-d706-4f6b-b62c-e7a14850571b to database: databricks_postgres

Если пользователь соответствует идентификатору клиента основного компонента службы из шага 1, обновление токена OAuth работает.

Примечание: Первое подключение после простоя может занять больше времени, так как автомасштабирование Lakebase начинает вычисление с нуля.

Устранение неполадок

Ошибка Исправить
"API отключен для пользователей без прав доступа к рабочей области" Включите "Доступ к рабочей области" для субъекта-службы (шаг 1).
Роль не существует или проверка подлинности завершается ошибкой Создайте роль OAuth с помощью SQL (шаг 2), а не пользовательского интерфейса.
"Подключение отказано" или "Конечная точка не найдена" Используйте ENDPOINT_NAME формат projects/<id>/branches/<id>/endpoints/<id>; идентификатор конечной точки находится в хосте.
"Недопустимый пользователь" или "Пользователь не найден" Задайте PGUSER идентификатор клиента субъекта-службы (UUID), а не отображаемое имя.