使用 Node.js 在 Azure Cosmos DB for PostgreSQL 上連線及執行 SQL 命令

適用於: Azure Cosmos DB for PostgreSQL (由 PostgreSQL 的超大規模 (Citus) 資料庫延伸模組提供)

此快速入門說明如何使用 Node.js 程式碼來連線到叢集,並使用 SQL 陳述式來建立資料表。 接著,您會在資料庫中插入、查詢、更新和刪除資料。 此文章中的步驟假設您已熟悉 Node.js 開發,但不熟悉使用 Azure Cosmos DB for PostgreSQL。

安裝 PostgreSQL 程式庫

此文章中的程式碼範例需要 pg (英文) 程式庫才能與 PostgreSQL 伺服器連接。 您必須使用自己的語言套件管理員 (例如 npm) 來安裝 pg。

連線、建立資料表及插入資料

建立通用連線模組

提示

以下範例程式碼使用連線集區來建立及管理與 PostgreSQL 的連線。 我們強烈建議使用應用程式端連線共用,原因如下:

  • 可確保應用程式不會產生過多資料庫連線,進而避免超過連線限制。
  • 可協助大幅改善效能,包括延遲和輸送量。 PostgreSQL 伺服器處理序必須進行派生才能處理每個新連線,而重複使用連線可避免派生帶來的負擔。

建立名為 [db] 的資料夾,然後在此資料夾中建立包含下列常見連線程式碼的 citus.js 檔案。 在此程式碼中,以叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

建立表格

使用下列程式碼搭配 CREATE TABLE 和 INSERT INTO SQL 陳述式來連線和載入資料。 程式碼會建立新的 pharmacy 資料表,並插入一些範例資料。

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

散發資料表

Azure Cosmos DB for PostgreSQL 可提供您在多個節點之間散發資料表的強大功能,以取得可擴縮性。 下列命令可讓您分散資料表。 您可以在這裡深入了解 create_distributed_table 和分散資料行。

注意

散發資料表可讓其在新增至叢集的任何背景工作節點上成長。

使用下列程式碼來連線至資料庫,並分散資料表。

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

讀取資料

使用下列程式碼搭配 SELECT SQL 陳述式來連線和讀取資料。

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

更新資料

使用下列程式碼搭配 UPDATE SQL 陳述式來連線和更新資料。

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

刪除資料

使用下列程式碼搭配 DELETE SQL 陳述式來連線和刪除資料。

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

適用於快速擷取的 COPY 命令

COPY 命令會在將資料內嵌至 Azure Cosmos DB for PostgreSQL 時產生極大的輸送量。 COPY 命令可以在檔案中擷取資料,或從記憶體中的微批次資料擷取以進行即時擷取。

用來從檔案載入資料的 COPY 命令

下列程式碼會將資料從 CSV 檔案複製到資料庫資料表。 程式碼需要 pg-copy-streams (英文) 套件與 pharmacies.csv 檔案。

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

用來載入記憶體內部資料的 COPY 命令

下列程式碼會將記憶體內部資料複製到資料表。 程式碼需要 through2 (英文) 套件,其能允許管道鏈結。

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

適用於資料庫要求失敗的應用程式重試

有時候,來自您應用程式的資料庫要求可能會失敗。 這類問題可能在不同的情況下發生,例如應用程式與資料庫之間的網路失敗、密碼不正確等。有些問題可能是暫時性的,而且會在幾秒到幾分鐘內自行解決。 您可以在應用程式中設定重試邏輯,以克服暫時性錯誤。

在應用程式中設定重試邏輯有助於改善使用者體驗。 在失敗案例中,使用者只會多等一些時間讓應用程式服務要求,而不會遇到錯誤。

下列範例示範如何在應用程式中實作重試邏輯。 範例程式碼片段會每隔 60 秒嘗試一次資料庫要求 (最多五次) 直到成功為止。 您可以根據應用程式的需求來設定重試次數和頻率。

在此程式碼中,以叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

下一步