Megosztás a következőn keresztül:


SQL-parancsok csatlakoztatása és futtatása a Node.js használatával az Azure Cosmos DB for PostgreSQL-en

A KÖVETKEZŐKRE VONATKOZIK: Azure Cosmos DB for PostgreSQL (a Citus adatbázisbővítménye a PostgreSQL-re)

Ez a rövid útmutató bemutatja, hogyan használhat Node.js kódot a fürthöz való csatlakozáshoz, és hogyan hozhat létre táblát SQL-utasítások használatával. Ezután adatokat szúrhat be, kérdezhet le, frissíthet és törölhet az adatbázisban. A cikk lépései feltételezik, hogy ismeri Node.js fejlesztést, és még nem ismeri az Azure Cosmos DB for PostgreSQL-et.

A PostgreSQL-kódtár telepítése

A cikkben szereplő kódpéldák megkövetelik, hogy a pg-kódtár a PostgreSQL-kiszolgálóval kommunikáljon. Telepítenie kell a pg-t a nyelvi csomagkezelővel (például npm).

Csatlakozás, tábla létrehozása és adatok beszúrása

A közös kapcsolati modul létrehozása

Tipp.

Az alábbi mintakód egy kapcsolatkészletet használ a PostgreSQL-kapcsolatok létrehozásához és kezeléséhez. Az alkalmazásoldali kapcsolatkészletezés erősen ajánlott, mert:

  • Biztosítja, hogy az alkalmazás ne hozzon létre túl sok kapcsolatot az adatbázishoz, így elkerülheti a kapcsolati korlátok túllépését.
  • Ez jelentősen javíthatja a teljesítményt – a késést és az átviteli sebességet is. A PostgreSQL-kiszolgáló folyamatának elágaztatást kell végeznie az egyes új kapcsolatok kezeléséhez, és a kapcsolat újrahasználása elkerüli ezt a többletterhelést.

Hozzon létre egy db nevű mappát, és ebben a mappában hozzon létre egy citus.js fájlt, amely az alábbi gyakori kapcsolatkódot tartalmazza. Ebben a kódban cserélje le <a fürtöt> a fürt nevére és <jelszavára> a rendszergazdai jelszóra.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Tábla létrehozása

A következő kóddal csatlakozhat, és betöltheti az adatokat a CREATE TABLE és az INSERT INTO SQL-utasításokkal. A kód létrehoz egy új pharmacy táblát, és beszúr néhány mintaadatot.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Táblák elosztása

Az Azure Cosmos DB for PostgreSQL lehetővé teszi a táblák több csomópont közötti elosztását a méretezhetőség érdekében. Az alábbi parancs lehetővé teszi egy tábla terjesztését. Erről és a terjesztési oszlopról create_distributed_table itt olvashat bővebben.

Feljegyzés

A táblák elosztása lehetővé teszi, hogy a fürthöz hozzáadott munkavégző csomópontok között növekedjenek.

Az alábbi kóddal csatlakozhat az adatbázishoz, és terjesztheti a táblát.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Adatok beolvasása

A következő kóddal csatlakozhat, és beolvashatja az adatokat a SELECT SQL-utasítással.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Adatok frissítése

Az alábbi kód használatával csatlakozhat és végezheti el az adatok módosítását egy UPDATE SQL-utasítás segítségével.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Adatok törlése

A következő kóddal csatlakozhat, és törölheti az adatokat a DELETE SQL-utasítással.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

COPY parancs a gyors betöltéshez

A COPY parancs hatalmas átviteli sebességet eredményezhet, miközben adatokat tölt be az Azure Cosmos DB for PostgreSQL-be. A COPY parancs betöltheti az adatokat fájlokban, vagy a memóriában lévő adatok mikro kötegeiből valós idejű betöltéshez.

MÁSOLÁS parancs fájlból származó adatok betöltéséhez

Az alábbi kód egy CSV-fájlból egy adatbázistáblába másolja az adatokat. A kódhoz a pg-copy-streams csomag és a fájl pharmacies.csv szükséges.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

COPY parancs a memóriában lévő adatok betöltéséhez

Az alábbi kód a memóriában lévő adatokat egy táblába másolja. A kódhoz az át2 csomag szükséges, amely lehetővé teszi a csőláncolást.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Alkalmazás újrapróbálkozása az adatbázis-kérelmek hibáihoz

Előfordulhat, hogy az alkalmazás adatbázis-kérelmei sikertelenek. Az ilyen problémák különböző forgatókönyvek esetén fordulhatnak elő, például az alkalmazás és az adatbázis közötti hálózati hiba, helytelen jelszó stb. Egyes problémák átmenetiek lehetnek, és néhány másodperc és perc alatt megoldhatják magukat. Az átmeneti hibák elhárításához konfigurálhatja az újrapróbálkozási logikát az alkalmazásban.

Az újrapróbálkozási logika konfigurálása az alkalmazásban segít a végfelhasználói élmény javításában. Hibaforgatókönyvek esetén a felhasználók csupán egy kicsit tovább várnak, amíg az alkalmazás a kéréseket kiszolgálja ahelyett, hogy hibákat tapasztalnak.

Az alábbi példa bemutatja, hogyan implementálhatja az újrapróbálkozás logikáját az alkalmazásban. A mintakódrészlet 60 másodpercenként (legfeljebb ötször) kísérel meg egy adatbázis-kérést, amíg sikeres nem lesz. Az újrapróbálkozések száma és gyakorisága az alkalmazás igényei szerint konfigurálható.

Ebben a kódban cserélje le <a fürtöt> a fürt nevére és <jelszavára> a rendszergazdai jelszóra.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Következő lépések