Použití Node.js k připojení a spouštění příkazů SQL ve službě Azure Cosmos DB for PostgreSQL

PLATÍ PRO: Azure Cosmos DB for PostgreSQL (využívá rozšíření databáze Citus na PostgreSQL)

V tomto rychlém startu se dozvíte, jak se pomocí kódu Node.js připojit ke clusteru a pomocí příkazů SQL vytvořit tabulku. Potom v databázi vložíte data, budete je dotazovat, aktualizovat a odstranit. Kroky v tomto článku předpokládají, že jste obeznámeni s vývojem Node.js a začínáte pracovat se službou Azure Cosmos DB for PostgreSQL.

Instalace knihovny PostgreSQL

Příklady kódu v tomto článku vyžadují, aby knihovna pg byla pro rozhraní se serverem PostgreSQL. Nástroj pg budete muset nainstalovat pomocí správce jazykových balíčků (například npm).

Připojení, vytvoření tabulky a vložení dat

Vytvoření modulu common connection

Tip

Následující ukázkový kód používá fond připojení k vytvoření a správě připojení k PostgreSQL. Sdružování připojení na straně aplikace se důrazně doporučuje, protože:

  • Zajišťuje, že aplikace negeneruje příliš mnoho připojení k databázi, a zabrání tak překročení limitů připojení.
  • Může výrazně zlepšit výkon – latenci i propustnost. Proces serveru PostgreSQL musí vytvořit fork, aby zvládl každé nové připojení, a opakovaného použádání připojení zabrání této režii.

Vytvořte složku s názvem db a v této složce vytvořte citus.js soubor, který obsahuje následující běžný kód připojení. V tomto kódu nahraďte <cluster> názvem a <heslem> vašeho clusteru heslem správce.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Vytvoření tabulky

Pomocí následujícího kódu se připojte a nahrajte data s využitím příkazů CREATE TABLE a INSERT INTO jazyka SQL. Kód vytvoří novou pharmacy tabulku a vloží ukázková data.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribuce tabulek

Azure Cosmos DB for PostgreSQL nabízí super výkon při distribuci tabulek napříč několika uzly pro zajištění škálovatelnosti. Následující příkaz umožňuje distribuovat tabulku. Další informace o create_distributed_table a distribučním sloupci najdete tady.

Poznámka

Distribuce tabulek umožňuje jejich růst napříč všemi pracovními uzly přidanou do clusteru.

Pomocí následujícího kódu se připojte k databázi a distribuujte tabulku.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Čtení dat

Pomocí následujícího kódu se připojte a načtěte data s využitím příkazu SELECT jazyka SQL.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Aktualizace dat

Pomocí následujícího kódu se připojte a aktualizujte data s využitím příkazu UPDATE jazyka SQL.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Odstranění dat

Pomocí následujícího kódu se připojte a odstraňte data s využitím příkazu DELETE jazyka SQL.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Příkaz COPY pro rychlý příjem dat

Příkaz COPY může při ingestování dat do služby Azure Cosmos DB for PostgreSQL poskytovat obrovskou propustnost . Příkaz COPY může ingestovat data v souborech nebo z mikrodávek dat v paměti pro příjem dat v reálném čase.

Příkaz COPY pro načtení dat ze souboru

Následující kód zkopíruje data ze souboru CSV do databázové tabulky. Kód vyžaduje balíček pg-copy-streams a soubor pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Příkaz COPY pro načtení dat v paměti

Následující kód zkopíruje data v paměti do tabulky. Kód vyžaduje balíček through2 , který umožňuje řetězení potrubí.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Opakování pokusu aplikace o selhání žádostí o databázi

Někdy je možné, že žádosti o databázi z vaší aplikace selžou. K takovým problémům může docházet v různých scénářích, jako je selhání sítě mezi aplikací a databází, nesprávné heslo atd. Některé problémy můžou být přechodné a samy se vyřeší během několika sekund až minut. V aplikaci můžete nakonfigurovat logiku opakování, aby se přechodné chyby vyřešily.

Konfigurace logiky opakování ve vaší aplikaci pomáhá zlepšit prostředí koncového uživatele. Ve scénářích selhání budou uživatelé pouze chvíli čekat, než aplikace obslouží požadavky, místo aby došlo k chybám.

Následující příklad ukazuje, jak do aplikace implementovat logiku opakování. Ukázkový fragment kódu zkouší požadavek databáze každých 60 sekund (až pětkrát), dokud nebude úspěšný. Počet a četnost opakování je možné nakonfigurovat na základě potřeb vaší aplikace.

V tomto kódu nahraďte <cluster> názvem a <heslem> vašeho clusteru heslem správce.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Další kroky