Bagikan melalui


Gunakan Node.js untuk menyambungkan dan menjalankan perintah SQL di Azure Cosmos DB for PostgreSQL

BERLAKU UNTUK: Azure Cosmos DB for PostgreSQL (didukung oleh ekstensi database Citus ke PostgreSQL)

Mulai cepat ini menunjukkan kepada Anda cara menggunakan kode Node.js untuk menyambungkan ke kluster, dan menggunakan pernyataan SQL untuk membuat tabel. Anda kemudian akan menyisipkan, mengkueri, memperbarui, dan menghapus data dalam database. Langkah-langkah dalam artikel ini mengasumsikan bahwa Anda terbiasa dengan pengembangan Node.js, dan baru bekerja dengan Azure Cosmos DB for PostgreSQL.

Menginstal pustaka PostgreSQL

Contoh kode dalam artikel ini mengharuskan pustaka pg untuk berinteraksi dengan server PostgreSQL. Anda harus menginstal pg dengan manajer paket bahasa Anda (seperti npm).

Menyambungkan, membuat tabel, dan menyisipkan data

Membuat modul koneksi umum

Tip

Kode sampel di bawah ini menggunakan kumpulan koneksi untuk membuat dan mengelola koneksi ke PostgreSQL. Pengumpulan koneksi sisi aplikasi sangat disarankan karena:

  • Tindakan ini memastikan bahwa aplikasi tidak menghasilkan terlalu banyak koneksi ke database, sehingga menghindari kelebihan batas koneksi.
  • Tindakan ini dapat membantu meningkatkan performa secara drastis baik latensi maupun throughput. Proses server PostgreSQL harus melakukan fork untuk menangani setiap koneksi baru, dan menggunakan kembali koneksi menghindari overhead tersebut.

Buat folder bernama db, dan di dalam folder ini buat file citus.js yang berisi kode koneksi umum berikut. Dalam kode ini, ganti <kluster> dengan nama kluster dan <kata sandi> Anda dengan kata sandi administrator Anda.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Buat tabel

Gunakan kode berikut untuk menghubungkan dan memuat data dengan menggunakan pernyataan SQL CREATE TABLE dan INSERT INTO. Kode membuat tabel baru pharmacy dan menyisipkan beberapa data sampel.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Mendistribusikan tabel

Azure Cosmos DB for PostgreSQL memberi Anda kekuatan super dalam mendistribusikan tabel di beberapa simpul untuk skalabilitas. Perintah di bawah ini memungkinkan Anda untuk mendistribusikan tabel. Anda dapat mempelajari selengkapnya tentang create_distributed_table dan kolom distribusi di sini.

Catatan

Mendistribusikan tabel memungkinkan mereka tumbuh di setiap simpul pekerja yang ditambahkan ke kluster.

Gunakan kode berikut untuk menyambungkan ke database dan mendistribusikan tabel.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Membaca data

Gunakan kode berikut untuk menyambungkan dan membaca data menggunakan pernyataan SQL SELECT.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Memperbarui data

Gunakan kode berikut untuk menyambungkan dan memperbarui data menggunakan pernyataan SQL UPDATE.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Menghapus data

Gunakan kode berikut untuk menyambungkan dan menghapus data dengan menggunakan pernyataan DELETE SQL.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Perintah COPY untuk penyerapan cepat

Perintah COPY dapat menghasilkan throughput yang luar biasa sambil menyerap data ke Azure Cosmos DB for PostgreSQL. Perintah COPY dapat menyerap data dalam file, atau dari batch mikro data dalam memori untuk penyerapan real time.

Perintah COPY untuk memuat data dari file

Kode berikut menyalin data dari file CSV ke tabel database. Kode memerlukan paket pg-copy-streams dan file pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Perintah COPY untuk memuat data dalam memori

Kode berikut menyalin data dalam memori ke tabel. Kode memerlukan paket through2 , yang memungkinkan penautan pipa.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Coba lagi aplikasi untuk kegagalan permintaan database

Terkadang ada kemungkinan permintaan database dari aplikasi Anda gagal. Masalah tersebut dapat terjadi dalam skenario yang berbeda, seperti kegagalan jaringan antara aplikasi dan database, kata sandi yang salah, dll. Beberapa masalah mungkin bersifat sementara dan selesai sendiri dalam beberapa detik atau menit. Anda dapat mengonfigurasi logika coba lagi di aplikasi Anda untuk mengatasi kesalahan sementara.

Mengonfigurasi logika coba lagi di aplikasi Anda membantu meningkatkan pengalaman pengguna akhir. Dalam skenario kegagalan, pengguna hanya akan menunggu sedikit lebih lama sampai aplikasi melayani permintaan alih-alih mengalami kesalahan.

Contoh di bawah ini menunjukkan cara menerapkan logika coba lagi di aplikasi Anda. Cuplikan kode sampel mengulang permintaan database setiap 60 detik (hingga lima kali) sampai berhasil. Jumlah dan frekuensi percobaan ulang dapat dikonfigurasi sesuai kebutuhan aplikasi Anda.

Dalam kode ini, ganti <kluster> dengan nama kluster dan <kata sandi> Anda dengan kata sandi administrator Anda.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Langkah berikutnya