用以在 Azure Cosmos DB for PostgreSQL 上連線和執行 SQL 命令的 Java 應用程式

適用於: Azure Cosmos DB for PostgreSQL (由 PostgreSQL 的超大規模 (Citus) 資料庫延伸模組提供)

此快速入門說明如何使用 Java 程式碼來連線到叢集,並使用 SQL 陳述式來建立資料表。 接著,您會在資料庫中插入、查詢、更新和刪除資料。 此文章中的步驟假設您已熟悉 Java 開發和 JDBC,但不熟悉使用 Azure Cosmos DB for PostgreSQL。

設定 Java 專案和連線

建立新的 Java 專案和組態檔,以連線至 Azure Cosmos DB for PostgreSQL。

建立新的 Java 專案

使用您最愛的整合式開發環境 (IDE),使用 groupId test 和 artifactId crud 建立新的 Java 專案。 在專案的根目錄中,新增具有下列內容的 pom.xml 檔案。 此檔案會將 Apache Maven 設定為使用 Java 8 和適用於 Java 的最新 PostgreSQL 驅動程式。

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>test</groupId>
  <artifactId>crud</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>crud</name>
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.2.12</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.zaxxer/HikariCP -->
    <dependency>
      <groupId>com.zaxxer</groupId>
      <artifactId>HikariCP</artifactId>
      <version>5.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-params</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>3.0.0-M5</version>
      </plugin>
    </plugins>
  </build>
</project>

設定資料庫連接

src/main/resources/ 中,使用下列內容建立 application.properties 檔案。 以您的叢集名稱取代 <cluster>,並以您的系統管理密碼取代 <password>。

driver.class.name=org.postgresql.Driver
db.url=jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.azure.com:5432/citus?ssl=true&sslmode=require
db.username=citus
db.password=<password>

db.url 屬性中的 ?ssl=true&sslmode=require 字串會告知 JDBC 驅動程式,在連線到資料庫時使用傳輸層安全性 (TLS)。 您必須使用 TLS 搭配 Azure Cosmos DB for PostgreSQL,這是很好的安全性做法。

建立表格

設定具有分散式資料表的資料庫結構描述。 連線到資料庫以建立結構描述和資料表。

產生資料庫結構描述

src/main/resources/ 中,使用下列內容建立 schema.sql 檔案:

DROP TABLE IF EXISTS public.pharmacy;
CREATE TABLE  public.pharmacy(pharmacy_id integer,pharmacy_name text ,city text ,state text ,zip_code integer);
CREATE INDEX idx_pharmacy_id ON public.pharmacy(pharmacy_id);

散發資料表

Azure Cosmos DB for PostgreSQL 可提供您在多個節點之間散發資料表的強大功能,以取得可擴縮性。 下列命令可讓您分散資料表。 您可以在這裡深入了解 create_distributed_table 和分散資料行。

注意

散發資料表可讓其在新增至叢集的任何背景工作節點上成長。

若要散發資料表,請將下一行附加至您在上一節中建立的 schema.sql 檔案。

select create_distributed_table('public.pharmacy','pharmacy_id');

連線到資料庫並建立結構描述

接下來,新增將使用 JDBC 的 Java 程式碼,以從您的叢集儲存和擷取資料。 程式碼會使用 application.propertiesschema.sql 檔案來連線到叢集並建立結構描述。

  1. 使用下列程式碼建立 DButil.java 檔案,其中包含 DButil 類別。 DBUtil 類別會使用 HikariCP 設定 PostgreSQL 的連線共用。 您可使用這個類別來連線到 PostgreSQL 並開始查詢。

    提示

    以下範例程式碼使用連線集區來建立及管理與 PostgreSQL 的連線。 我們強烈建議使用應用程式端連線共用,原因如下:

    • 可確保應用程式不會產生過多資料庫連線,進而避免超過連線限制。
    • 可協助大幅改善效能,包括延遲和輸送量。 PostgreSQL 伺服器處理序必須進行派生才能處理每個新連線,而重複使用連線可避免派生帶來的負擔。
    //DButil.java
    package test.crud;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.sql.SQLException;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import com.zaxxer.hikari.HikariDataSource;
    
    public class DButil {
        private static final String DB_USERNAME = "db.username";
        private static final String DB_PASSWORD = "db.password";
        private static final String DB_URL = "db.url";
        private static final String DB_DRIVER_CLASS = "driver.class.name";
        private static Properties properties =  null;
        private static HikariDataSource datasource;
    
        static {
            try {
                properties = new Properties();
                properties.load(new FileInputStream("src/main/java/application.properties"));
    
                datasource = new HikariDataSource();
                datasource.setDriverClassName(properties.getProperty(DB_DRIVER_CLASS ));
                datasource.setJdbcUrl(properties.getProperty(DB_URL));
                datasource.setUsername(properties.getProperty(DB_USERNAME));
                datasource.setPassword(properties.getProperty(DB_PASSWORD));
                datasource.setMinimumIdle(100);
                datasource.setMaximumPoolSize(1000000000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
            } catch (IOException | SQLException  e) {
                e.printStackTrace();
            }
        }
        public static DataSource getDataSource() {
            return datasource;
        }
    }
    
  2. src/main/java/ 中,建立包含下列程式碼的 DemoApplication.java 檔案:

    package test.crud;
    import java.io.IOException;
    import java.sql.*;
    import java.util.*;
    import java.util.logging.Logger;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import org.postgresql.copy.CopyManager;
    import org.postgresql.core.BaseConnection;
    import java.io.IOException;
    import java.io.Reader;
    import java.io.StringReader;
    
    public class DemoApplication {
    
        private static final Logger log;
    
        static {
            System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
            log =Logger.getLogger(DemoApplication.class.getName());
        }
        public static void main(String[] args)throws Exception
        {
            log.info("Connecting to the database");
            Connection connection = DButil.getDataSource().getConnection();
            System.out.println("The Connection Object is of Class: " + connection.getClass());
            log.info("Database connection test: " + connection.getCatalog());
            log.info("Creating table");
            log.info("Creating index");
            log.info("distributing table");
            Scanner scanner = new Scanner(DemoApplication.class.getClassLoader().getResourceAsStream("schema.sql"));
            Statement statement = connection.createStatement();
            while (scanner.hasNextLine()) {
                statement.execute(scanner.nextLine());
            }
            log.info("Closing database connection");
            connection.close();
        }
    
    }
    

    注意

    執行 DriverManager.getConnection(properties.getProperty("url"), properties); 時會使用資料庫 userpassword 認證。 認證會儲存在 application.properties 檔案中,以引數的形式傳遞。

  3. 您現在可以使用慣用的工具來執行此主要類別:

    • 使用 IDE 時,您應該能夠以滑鼠右鍵按一下 DemoApplication 類別,然後加以執行。
    • 您可使用 Maven 來執行應用程式,請執行:
      mvn exec:java -Dexec.mainClass="com.example.demo.DemoApplication".

應用程式應該連線至 Azure Cosmos DB for PostgreSQL、建立資料庫結構描述,然後關閉連線,接著您應該會在主控台記錄中看到:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Create database schema
[INFO   ] Closing database connection

建立領域類別

DemoApplication 類別旁建立新的 Pharmacy Java 類別,並新增下列程式碼:

public class Pharmacy {
    private Integer pharmacy_id;
    private String pharmacy_name;
    private String city;
    private String state;
    private Integer zip_code;
    public Pharmacy() { }
    public Pharmacy(Integer pharmacy_id, String pharmacy_name, String city,String state,Integer zip_code)
    {
        this.pharmacy_id = pharmacy_id;
        this.pharmacy_name = pharmacy_name;
        this.city = city;
        this.state = state;
        this.zip_code = zip_code;
    }

    public Integer getpharmacy_id() {
        return pharmacy_id;
    }

    public void setpharmacy_id(Integer pharmacy_id) {
        this.pharmacy_id = pharmacy_id;
    }

    public String getpharmacy_name() {
        return pharmacy_name;
    }

    public void setpharmacy_name(String pharmacy_name) {
        this.pharmacy_name = pharmacy_name;
    }

    public String getcity() {
        return city;
    }

    public void setcity(String city) {
        this.city = city;
    }

    public String getstate() {
        return state;
    }

    public void setstate(String state) {
        this.state = state;
    }

    public Integer getzip_code() {
        return zip_code;
    }

    public void setzip_code(Integer zip_code) {
        this.zip_code = zip_code;
    }
    @Override
    public String toString() {
        return "TPharmacy{" +
               "pharmacy_id=" + pharmacy_id +
               ", pharmacy_name='" + pharmacy_name + '\'' +
               ", city='" + city + '\'' +
               ", state='" + state + '\'' +
               ", zip_code='" + zip_code + '\'' +
               '}';
    }
}

這個類別是在您執行 schema.sql 指令碼時建立的 Pharmacy 資料表中對應的領域模型。

插入資料

DemoApplication.java 檔案的 main 方法後面,新增下列方法,以使用 INSERT INTO SQL 陳述式將資料插入資料庫中:

private static void insertData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Insert data");
    PreparedStatement insertStatement = connection
        .prepareStatement("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code)  VALUES (?, ?, ?, ?, ?);");

    insertStatement.setInt(1, todo.getpharmacy_id());
    insertStatement.setString(2, todo.getpharmacy_name());
    insertStatement.setString(3, todo.getcity());
    insertStatement.setString(4, todo.getstate());
    insertStatement.setInt(5, todo.getzip_code());

    insertStatement.executeUpdate();
}

在 main 方法中新增下列兩行:

Pharmacy todo = new Pharmacy(0,"Target","Sunnyvale","California",94001);
insertData(todo, connection);

執行主要類別現在應該會產生下列輸出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Closing database connection

讀取資料

讀取先前插入的資料,以驗證程式碼是否正確運作。

DemoApplication.java 檔案的 insertData 方法後面,新增下列方法,以使用 SELECT SQL 陳述式來讀取資料庫中的資料:

private static Pharmacy readData(Connection connection) throws SQLException {
    log.info("Read data");
    PreparedStatement readStatement = connection.prepareStatement("SELECT * FROM Pharmacy;");
    ResultSet resultSet = readStatement.executeQuery();
    if (!resultSet.next()) {
        log.info("There is no data in the database!");
        return null;
    }
    Pharmacy todo = new Pharmacy();
    todo.setpharmacy_id(resultSet.getInt("pharmacy_id"));
    todo.setpharmacy_name(resultSet.getString("pharmacy_name"));
    todo.setcity(resultSet.getString("city"));
    todo.setstate(resultSet.getString("state"));
    todo.setzip_code(resultSet.getInt("zip_code"));
    log.info("Data read from the database: " + todo.toString());
    return todo;
}

在 main 方法中新增下列一行:

todo = readData(connection);

執行主要類別現在應該會產生下列輸出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Closing database connection

更新資料

更新您先前插入的資料。

同樣在 DemoApplication.java 檔案的 readData 方法後面,新增下列方法,以使用 UPDATE SQL 陳述式來更新資料庫中的資料:

private static void updateData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Update data");
    PreparedStatement updateStatement = connection
        .prepareStatement("UPDATE pharmacy SET city = ? WHERE pharmacy_id = ?;");

    updateStatement.setString(1, todo.getcity());

    updateStatement.setInt(2, todo.getpharmacy_id());
    updateStatement.executeUpdate();
    readData(connection);
}

在 main 方法中新增下列兩行:

todo.setcity("Guntur");
updateData(todo, connection);

執行主要類別現在應該會產生下列輸出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Closing database connection

刪除資料

最後,刪除您先前插入的資料。 同樣在 DemoApplication.java 檔案的 updateData 方法後面,新增下列方法,以使用 DELETE SQL 陳述式來刪除資料庫中的資料:

private static void deleteData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Delete data");
    PreparedStatement deleteStatement = connection.prepareStatement("DELETE FROM pharmacy WHERE pharmacy_id = ?;");
    deleteStatement.setLong(1, todo.getpharmacy_id());
    deleteStatement.executeUpdate();
    readData(connection);
}

您現在可以在 main 方法中新增下列行:

deleteData(todo, connection);

執行主要類別現在應該會產生下列輸出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO   ] Closing database connection

適用於快速擷取的 COPY 命令

COPY 命令會在將資料內嵌至 Azure Cosmos DB for PostgreSQL 時產生極大的輸送量。 COPY 命令可以在檔案中擷取資料,或從記憶體中的微批次資料擷取以進行即時擷取。

用來從檔案載入資料的 COPY 命令

下列程式碼會將資料從 CSV 檔案複製到資料庫資料表。 程式碼範例需要檔案 pharmacies.csv

public static long
copyFromFile(Connection connection, String filePath, String tableName)
throws SQLException, IOException {
    long count = 0;
    FileInputStream fileInputStream = null;

    try {
        Connection unwrap = connection.unwrap(Connection.class);
        BaseConnection  connSec = (BaseConnection) unwrap;

        CopyManager copyManager = new CopyManager((BaseConnection) connSec);
        fileInputStream = new FileInputStream(filePath);
        count = copyManager.copyIn("COPY " + tableName + " FROM STDIN delimiter ',' csv", fileInputStream);
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    return count;
}

您現在可以在 main 方法中新增下列行:

int c = (int) copyFromFile(connection,"C:\\Users\\pharmacies.csv", "pharmacy");
log.info("Copied "+ c +" rows using COPY command");

執行 main 類別現在應該會產生下列輸出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO ] Copied 5000 rows using COPY command
[INFO   ] Closing database connection

用來載入記憶體內部資料的 COPY 命令

下列程式碼會將記憶體內部資料複製到資料表。

private static void inMemory(Connection connection) throws SQLException,IOException
    {
    log.info("Copying inmemory data into table");
            
    final List<String> rows = new ArrayList<>();
    rows.add("0,Target,Sunnyvale,California,94001");
    rows.add("1,Apollo,Guntur,Andhra,94003");
        
    final BaseConnection baseConnection = (BaseConnection) connection.unwrap(Connection.class);
    final CopyManager copyManager = new CopyManager(baseConnection);

    // COPY command can change based on the format of rows. This COPY command is for above rows.
    final String copyCommand = "COPY pharmacy FROM STDIN with csv";        
       
    try (final Reader reader = new StringReader(String.join("\n", rows))) {
        copyManager.copyIn(copyCommand, reader);
    }
}

您現在可以在 main 方法中新增下列行:

inMemory(connection);

執行主要類別現在應該會產生下列輸出:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
5000
[INFO   ] Copying in-memory data into table
[INFO   ] Closing database connection

適用於資料庫要求失敗的應用程式重試

有時候,來自您應用程式的資料庫要求可能會失敗。 這類問題可能在不同的情況下發生,例如應用程式與資料庫之間的網路失敗、密碼不正確等。有些問題可能是暫時性的,而且會在幾秒到幾分鐘內自行解決。 您可以在應用程式中設定重試邏輯,以克服暫時性錯誤。

在應用程式中設定重試邏輯有助於改善使用者體驗。 在失敗案例中,使用者只會多等一些時間讓應用程式服務要求,而不會遇到錯誤。

下列範例示範如何在應用程式中實作重試邏輯。 範例程式碼片段會每隔 60 秒嘗試一次資料庫要求 (最多五次) 直到成功為止。 您可以根據應用程式的需求來設定重試次數和頻率。

在此程式碼中,以叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。

package test.crud;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.logging.Logger;
import com.zaxxer.hikari.HikariDataSource;

public class DemoApplication
{
    private static final Logger log;

    static
    {
        System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
        log = Logger.getLogger(DemoApplication.class.getName());
    }
    private static final String DB_USERNAME = "citus";
    private static final String DB_PASSWORD = "<password>";
    private static final String DB_URL = "jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.azure.com:5432/citus?sslmode=require";
    private static final String DB_DRIVER_CLASS = "org.postgresql.Driver";
    private static HikariDataSource datasource;

    private static String executeRetry(String sql, int retryCount) throws InterruptedException
    {
        Connection con = null;
        PreparedStatement pst = null;
        ResultSet rs = null;
        for (int i = 1; i <= retryCount; i++)
        {
            try
            {
                datasource = new HikariDataSource();
                datasource.setDriverClassName(DB_DRIVER_CLASS);
                datasource.setJdbcUrl(DB_URL);
                datasource.setUsername(DB_USERNAME);
                datasource.setPassword(DB_PASSWORD);
                datasource.setMinimumIdle(10);
                datasource.setMaximumPoolSize(1000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
                log.info("Connecting to the database");
                con = datasource.getConnection();
                log.info("Connection established");
                log.info("Read data");
                pst = con.prepareStatement(sql);
                rs = pst.executeQuery();
                StringBuilder builder = new StringBuilder();
                int columnCount = rs.getMetaData().getColumnCount();
                while (rs.next())
                {
                    for (int j = 0; j < columnCount;)
                    {
                        builder.append(rs.getString(j + 1));
                        if (++j < columnCount)
                            builder.append(",");
                    }
                    builder.append("\r\n");
                }
                return builder.toString();
            }
            catch (Exception e)
            {
                Thread.sleep(60000);
                System.out.println(e.getMessage());
            }
        }
        return null;
    }

    public static void main(String[] args) throws Exception
    {
        String result = executeRetry("select 1", 5);
        System.out.print(result);
    }
}

下一步