Поделиться через


Выполнение запросов с помощью драйвера JDBC

На этой странице содержатся примеры, показывающие, как выполнять запросы с помощью драйвера JDBC Databricks версии 3 и выше.

Замечание

Драйвер JDBC Databricks имеет ограничение параметров 256 для параметризованных инструкций.

Пример. Выполнение запроса

В следующем примере показано, как использовать драйвер JDBC Databricks для выполнения sql-запроса Databricks с помощью вычислительного ресурса Azure Databricks.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.Properties;

public class DatabricksJDBCExample {

    public static void main(String[] args) {

        Class.forName("com.databricks.client.jdbc.Driver");

        // Set JDBC URL properties
        String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com:443";
        Properties connectionProperties = new Properties();
        connectionProperties.put("httpPath", "sql/protocolv1/o/123456780012345/0123-123450-z000pi22");
        connectionProperties.put("ssl", "1");

        // Set authentication properties (personal access token)
        connectionProperties.put("AuthMech", "3");
        connectionProperties.put("user", "token");
        connectionProperties.put("password", "12345678901234667890abcdabcd");

        // Set logging properties
        connectionProperties.put("logPath", "logs/myapplication.log");

        // Establish connection and execute query
        try (Connection connection = DriverManager.getConnection(jdbcUrl, connectionProperties);
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("SELECT * FROM samples.nyctaxi.trips")) {

            // Get metadata and column names
            ResultSetMetaData metaData = resultSet.getMetaData();
            String[] columns = new String[metaData.getColumnCount()];
            for (int i = 0; i < columns.length; i++) {
                columns[i] = metaData.getColumnName(i + 1);
            }

            // Process and print the result set
            while (resultSet.next()) {
                System.out.print("Row " + resultSet.getRow() + "=[");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        System.out.print(", ");
                    }
                    System.out.print(columns[i] + "='" + resultSet.getObject(i + 1) + "'");
                }
                System.out.println("]");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Пример. Асинхронное выполнение запроса

В следующих примерах показано, как использовать драйвер JDBC Databricks для выполнения и обработки асинхронного sql-запроса Databricks.

Для справки по API см. справочник Java API для драйвера JDBC Databricks.

Инициируйте асинхронное выполнение инструкции:

Statement statement = conn.createStatement();
IDatabricksStatement dbStatement = statement.unwrap(IDatabricksStatement.class);

ResultSet result = dbStatement.executeAsync(sql);
IDatabricksResultSet asyncResult = result.unwrap(IDatabricksResultSet.class);
IExecutionStatus asyncStatus = asyncResult.getExecutionStatus();
long startTime = System.currentTimeMillis();
while ((asyncStatus.getExecutionState() == ExecutionState.RUNNING |  asyncStatus.getExecutionState() == ExecutionState.PENDING) || (startTime + timeout < System.currentTimeMillis())) {
  Thread.sleep(1000); // Sleep for 1000 ms
  asyncResult = dbStatement.getExecutionResult().unwrap(IDatabricksResultSet.class);
  asyncStatus = asyncResult.getExecutionStatus();
}

if (asyncStatus.getExecutionStatus() == ExecutionState.RUNNING | ExecutionState.PENDING) {
  dbStatement.cancel();
}

if (asyncStatus.getExecutionStatus() == ExecutionState.SUCCEEDED) {
  // process result set
}

if (asyncStatus.getExecutionStatus() == ExecutionState.FAILED) {
  String sqlState = asyncStatus.getSqlState();
  String errorMessage = asyncStatus.getErrorMessage();
  // log error code and message
}

Обработка заявления в отдельном потоке:

Statement statement = conn1.createStatement();
IDatabricksStatement dbStatement = statement.unwrap(IDatabricksStatement.class);

ResultSet asyncResult = dbStatement.executeAsync(sql);
IDatabricksResultSet drs = asyncResult.unwrap(IDatabricksResultSet.class);
String statementId = drs.getStatementId();

ExecutionState state = drs.getExecutionStatus().getExecutionState();

while (state != ExecutionState.SUCCEEDED) {
  Thread.sleep(sleepInterval);
  asyncResult = dbStatement.getExecutionResult();
  state = asyncResult.unwrap(IDatabricksResultSet.class).getExecutionStatus().getExecutionState();
}


// In another thread
IDatabricksConnection dbConn2 = conn2.unwrap(IDatabricksConnection.class);
IDatabricksStatement asyncStatementHandle = dbConn2.getStatement(statementId).unwrap(IDatabricksStatement.class);
IDatabricksResultSet asyncResultHandle = asyncStatementHandle.getExecutionResult().unwrap(IDatabricksResultSet.class);

// Cancel if needed
if (asyncResultHandle.getExecutionStatus().getExecutionState() == ExecutionState.PENDING | asyncResultHandle.getExecutionStatus().getExecutionState() == ExecutionState.RUNNING) {
  asyncStatementHandle.cancel();
}

Закройте подключение с помощью идентификатора подключения:


// Get connection-Id from existing connection
String connectionId = conn.unwrap(IDatabricksConnection.class).getConnectionId();

// Close the connection from other thread using same JDBC Url and connection properties and connection-Id retrieved from above
com.databricks.client.jdbc.Driver.getInstance().closeConnection(jdbcUrl, properties, connectionId);

Пример. Запрос геопространственных данных

В следующем примере показано, как запрашивать и извлекать геопространственные типы данных с помощью драйвера JDBC Databricks. Чтобы получить геопространственные данные в виде структурированных объектов Java, включите свойства подключения EnableComplexDatatypeSupport и EnableGeoSpatialSupport.

Дополнительные сведения о геопространственных типах данных и функциях см. в разделе "Геопространственные функции ST".

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.Properties;
import com.databricks.jdbc.api.IGeometry;
import com.databricks.jdbc.api.IGeography;

public class GeospatialExample {

    public static void main(String[] args) {

        // Set JDBC URL properties
        String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com:443";
        Properties connectionProperties = new Properties();
        connectionProperties.put("httpPath", "sql/protocolv1/o/123456780012345/0123-123450-z000pi22");
        connectionProperties.put("ssl", "1");

        // Set authentication properties (personal access token)
        connectionProperties.put("AuthMech", "3");
        connectionProperties.put("user", "token");
        connectionProperties.put("password", "12345678901234667890abcdabcd");

        // Enable geospatial support
        connectionProperties.put("EnableComplexDatatypeSupport", "1");
        connectionProperties.put("EnableGeoSpatialSupport", "1");

        // Establish connection and execute geospatial query
        try (Connection connection = DriverManager.getConnection(jdbcUrl, connectionProperties);
             Statement statement = connection.createStatement();
             ResultSet rs = statement.executeQuery(
                 "SELECT ST_Point(1.0, 2.0) as point, " +
                 "ST_GeogFromText('POINT(-122.4194 37.7749)') as location")) {

            while (rs.next()) {
                // Retrieve GEOMETRY object
                IGeometry point = (IGeometry) rs.getObject("point");
                System.out.println("Point WKT: " + point.getWKT());
                System.out.println("Point SRID: " + point.getSRID());

                // Retrieve GEOGRAPHY object
                IGeography location = (IGeography) rs.getObject("location");
                System.out.println("Location WKT: " + location.getWKT());
                System.out.println("Location SRID: " + location.getSRID());

                // Metadata
                ResultSetMetaData meta = rs.getMetaData();
                System.out.println("Column 1 type: " + meta.getColumnTypeName(1)); // GEOMETRY
                System.out.println("Column 1 class: " + meta.getColumnClassName(1)); // com.databricks.jdbc.api.IGeometry
                System.out.println("Column 2 type: " + meta.getColumnTypeName(2)); // GEOGRAPHY
                System.out.println("Column 2 class: " + meta.getColumnClassName(2)); // com.databricks.jdbc.api.IGeography
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Пример. Использование транзакций

Многооператорные транзакции группируют несколько инструкций SQL в одно атомарное действие. Все операции завершаются успешно вместе или завершаются неудачно вместе. Таблицы должны иметь включенные коммиты, управляемые каталогом.

В следующем примере передаются средства между двумя счетами. Обе UPDATE инструкции выполняются как единая атомарная единица. Если что-либо не удается, откат предотвращает возможность частичных обновлений.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.SQLException;

public class DatabricksTransactionExample {

    public static void main(String[] args) {

        String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com/default;" +
                         "transportMode=http;" +
                         "ssl=1;" +
                         "AuthMech=3;" +
                         "httpPath=sql/1.0/warehouses/abc123def456;" +
                         "uid=token;" +
                         "pwd=your-access-token";

        try (Connection connection = DriverManager.getConnection(jdbcUrl);
             Statement stmt = connection.createStatement()) {

            connection.setAutoCommit(false);

            try {
                stmt.executeUpdate(
                    "UPDATE accounts SET balance = balance - 100.00 WHERE account_id = 'ACC001'");
                stmt.executeUpdate(
                    "UPDATE accounts SET balance = balance + 100.00 WHERE account_id = 'ACC002'");
                stmt.executeUpdate(
                    "INSERT INTO audit_log VALUES ('ACC001', 'ACC002', 100.00, current_timestamp())");

                connection.commit();

            } catch (SQLException e) {
                connection.rollback();
                throw e;
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

При использовании транзакций с JDBC:

  • Позвоните commit() или rollback() явно перед закрытием подключений.
  • Предпочитайте методы JDBC (setAutoCommit(), commit(), rollback()) по сравнению с командами SQL для переносимости.
  • Не делитесь одним подключением между несколькими потоками— создайте отдельное подключение для каждого потока.

Дополнительные сведения о транзакциях см. в разделе "Транзакции".