Aracılığıyla paylaş


JDBC sürücüsünü kullanarak sorgu çalıştırma

Bu sayfada Databricks JDBC Sürücüsü, sürüm 3 ve üzerini kullanarak sorguların nasıl çalıştırıldığını gösteren örnekler yer alır.

Uyarı

Databricks JDBC Sürücüsünde parametreli deyimler için parametre sınırı 256'dır.

Örnek: Sorgu çalıştırma

Aşağıdaki örnekte , Azure Databricks işlem kaynağı kullanarak Databricks SQL sorgusu çalıştırmak için Databricks JDBC Sürücüsünün nasıl kullanılacağı gösterilmektedir.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.Properties;

public class DatabricksJDBCExample {

    public static void main(String[] args) {

        Class.forName("com.databricks.client.jdbc.Driver");

        // Set JDBC URL properties
        String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com:443";
        Properties connectionProperties = new Properties();
        connectionProperties.put("httpPath", "sql/protocolv1/o/123456780012345/0123-123450-z000pi22");
        connectionProperties.put("ssl", "1");

        // Set authentication properties (personal access token)
        connectionProperties.put("AuthMech", "3");
        connectionProperties.put("user", "token");
        connectionProperties.put("password", "12345678901234667890abcdabcd");

        // Set logging properties
        connectionProperties.put("logPath", "logs/myapplication.log");

        // Establish connection and execute query
        try (Connection connection = DriverManager.getConnection(jdbcUrl, connectionProperties);
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("SELECT * FROM samples.nyctaxi.trips")) {

            // Get metadata and column names
            ResultSetMetaData metaData = resultSet.getMetaData();
            String[] columns = new String[metaData.getColumnCount()];
            for (int i = 0; i < columns.length; i++) {
                columns[i] = metaData.getColumnName(i + 1);
            }

            // Process and print the result set
            while (resultSet.next()) {
                System.out.print("Row " + resultSet.getRow() + "=[");
                for (int i = 0; i < columns.length; i++) {
                    if (i != 0) {
                        System.out.print(", ");
                    }
                    System.out.print(columns[i] + "='" + resultSet.getObject(i + 1) + "'");
                }
                System.out.println("]");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Örnek: Sorguyu zaman uyumsuz olarak çalıştırma

Aşağıdaki örneklerde, zaman uyumsuz databricks SQL sorgusunu çalıştırmak ve işlemek için Databricks JDBC Sürücüsünün nasıl kullanılacağı gösterilmektedir.

API başvurusu için bkz. Databricks JDBC Sürücüsü için Java API başvurusu.

Bir deyimi eşzamansız yürütmeye başlayın.

Statement statement = conn.createStatement();
IDatabricksStatement dbStatement = statement.unwrap(IDatabricksStatement.class);

ResultSet result = dbStatement.executeAsync(sql);
IDatabricksResultSet asyncResult = result.unwrap(IDatabricksResultSet.class);
IExecutionStatus asyncStatus = asyncResult.getExecutionStatus();
long startTime = System.currentTimeMillis();
while ((asyncStatus.getExecutionState() == ExecutionState.RUNNING |  asyncStatus.getExecutionState() == ExecutionState.PENDING) || (startTime + timeout < System.currentTimeMillis())) {
  Thread.sleep(1000); // Sleep for 1000 ms
  asyncResult = dbStatement.getExecutionResult().unwrap(IDatabricksResultSet.class);
  asyncStatus = asyncResult.getExecutionStatus();
}

if (asyncStatus.getExecutionStatus() == ExecutionState.RUNNING | ExecutionState.PENDING) {
  dbStatement.cancel();
}

if (asyncStatus.getExecutionStatus() == ExecutionState.SUCCEEDED) {
  // process result set
}

if (asyncStatus.getExecutionStatus() == ExecutionState.FAILED) {
  String sqlState = asyncStatus.getSqlState();
  String errorMessage = asyncStatus.getErrorMessage();
  // log error code and message
}

Ayrı bir iş parçacığında bir deyimi işle.

Statement statement = conn1.createStatement();
IDatabricksStatement dbStatement = statement.unwrap(IDatabricksStatement.class);

ResultSet asyncResult = dbStatement.executeAsync(sql);
IDatabricksResultSet drs = asyncResult.unwrap(IDatabricksResultSet.class);
String statementId = drs.getStatementId();

ExecutionState state = drs.getExecutionStatus().getExecutionState();

while (state != ExecutionState.SUCCEEDED) {
  Thread.sleep(sleepInterval);
  asyncResult = dbStatement.getExecutionResult();
  state = asyncResult.unwrap(IDatabricksResultSet.class).getExecutionStatus().getExecutionState();
}


// In another thread
IDatabricksConnection dbConn2 = conn2.unwrap(IDatabricksConnection.class);
IDatabricksStatement asyncStatementHandle = dbConn2.getStatement(statementId).unwrap(IDatabricksStatement.class);
IDatabricksResultSet asyncResultHandle = asyncStatementHandle.getExecutionResult().unwrap(IDatabricksResultSet.class);

// Cancel if needed
if (asyncResultHandle.getExecutionStatus().getExecutionState() == ExecutionState.PENDING | asyncResultHandle.getExecutionStatus().getExecutionState() == ExecutionState.RUNNING) {
  asyncStatementHandle.cancel();
}

Bağlantı kimliğini kullanarak bağlantıyı kapatın:


// Get connection-Id from existing connection
String connectionId = conn.unwrap(IDatabricksConnection.class).getConnectionId();

// Close the connection from other thread using same JDBC Url and connection properties and connection-Id retrieved from above
com.databricks.client.jdbc.Driver.getInstance().closeConnection(jdbcUrl, properties, connectionId);

Örnek: Jeo-uzamsal verileri sorgulama

Aşağıdaki örnekte Databricks JDBC Sürücüsü kullanılarak jeo-uzamsal veri türlerini sorgulama ve alma işlemleri gösterilmektedir. Jeo-uzamsal verileri yapılandırılmış Java nesneleri olarak almak için EnableComplexDatatypeSupport ve EnableGeoSpatialSupport bağlantı özelliklerini etkinleştirin.

Jeo-uzamsal veri türleri ve işlevleri hakkında daha fazla bilgi için bkz. ST jeo-uzamsal işlevler.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.Properties;
import com.databricks.jdbc.api.IGeometry;
import com.databricks.jdbc.api.IGeography;

public class GeospatialExample {

    public static void main(String[] args) {

        // Set JDBC URL properties
        String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com:443";
        Properties connectionProperties = new Properties();
        connectionProperties.put("httpPath", "sql/protocolv1/o/123456780012345/0123-123450-z000pi22");
        connectionProperties.put("ssl", "1");

        // Set authentication properties (personal access token)
        connectionProperties.put("AuthMech", "3");
        connectionProperties.put("user", "token");
        connectionProperties.put("password", "12345678901234667890abcdabcd");

        // Enable geospatial support
        connectionProperties.put("EnableComplexDatatypeSupport", "1");
        connectionProperties.put("EnableGeoSpatialSupport", "1");

        // Establish connection and execute geospatial query
        try (Connection connection = DriverManager.getConnection(jdbcUrl, connectionProperties);
             Statement statement = connection.createStatement();
             ResultSet rs = statement.executeQuery(
                 "SELECT ST_Point(1.0, 2.0) as point, " +
                 "ST_GeogFromText('POINT(-122.4194 37.7749)') as location")) {

            while (rs.next()) {
                // Retrieve GEOMETRY object
                IGeometry point = (IGeometry) rs.getObject("point");
                System.out.println("Point WKT: " + point.getWKT());
                System.out.println("Point SRID: " + point.getSRID());

                // Retrieve GEOGRAPHY object
                IGeography location = (IGeography) rs.getObject("location");
                System.out.println("Location WKT: " + location.getWKT());
                System.out.println("Location SRID: " + location.getSRID());

                // Metadata
                ResultSetMetaData meta = rs.getMetaData();
                System.out.println("Column 1 type: " + meta.getColumnTypeName(1)); // GEOMETRY
                System.out.println("Column 1 class: " + meta.getColumnClassName(1)); // com.databricks.jdbc.api.IGeometry
                System.out.println("Column 2 type: " + meta.getColumnTypeName(2)); // GEOGRAPHY
                System.out.println("Column 2 class: " + meta.getColumnClassName(2)); // com.databricks.jdbc.api.IGeography
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Örnek: İşlemlerin kullanılması

Çok deyimli işlemler birden çok SQL deyimini tek bir atomik ünitede gruplandırıyor. Tüm ifadeler birlikte başarılı olur veya birlikte başarısız olur. Tablolarda katalog tarafından yönetilen işlemeler etkinleştirilmelidir.

Aşağıdaki örnek, iki hesap arasında fon aktarır. Her iki deyim de UPDATE tek bir atomik birim olarak çalışır. Herhangi biri başarısız olursa geri alma işlemi kısmi güncelleştirmeleri engeller.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.SQLException;

public class DatabricksTransactionExample {

    public static void main(String[] args) {

        String jdbcUrl = "jdbc:databricks://dbc-a1b2345c-d6e7.cloud.databricks.com/default;" +
                         "transportMode=http;" +
                         "ssl=1;" +
                         "AuthMech=3;" +
                         "httpPath=sql/1.0/warehouses/abc123def456;" +
                         "uid=token;" +
                         "pwd=your-access-token";

        try (Connection connection = DriverManager.getConnection(jdbcUrl);
             Statement stmt = connection.createStatement()) {

            connection.setAutoCommit(false);

            try {
                stmt.executeUpdate(
                    "UPDATE accounts SET balance = balance - 100.00 WHERE account_id = 'ACC001'");
                stmt.executeUpdate(
                    "UPDATE accounts SET balance = balance + 100.00 WHERE account_id = 'ACC002'");
                stmt.executeUpdate(
                    "INSERT INTO audit_log VALUES ('ACC001', 'ACC002', 100.00, current_timestamp())");

                connection.commit();

            } catch (SQLException e) {
                connection.rollback();
                throw e;
            }

        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

JDBC ile işlemler kullanılırken:

  • Bağlantıları kapatmadan önce commit() veya rollback() açıkça çağırın.
  • Taşınabilirlik için SQL komutları yerine JDBC yöntemlerini (setAutoCommit(), commit(), rollback()) tercih edin.
  • Birden çok iş parçacığı arasında tek bir bağlantı paylaşmayın; her iş parçacığı için ayrı bir bağlantı oluşturun.

İşlemler hakkında daha fazla bilgi için bkz. İşlemler.