Usare Node.js per connettersi ed eseguire comandi SQL in Azure Cosmos DB for PostgreSQL

SI APPLICA A: Azure Cosmos DB for PostgreSQL (basato su estensione database Citus per PostgreSQL)

Questa guida introduttiva illustra come usare il codice Node.js per connettersi a un cluster e usare istruzioni SQL per creare una tabella. Si inseriranno dati per poi eseguire query a riguardo, aggiornarli ed eliminarli nel database. I passaggi descritti in questo articolo presuppongono che si abbia familiarità con lo sviluppo Node.js e non si abbia familiarità con Azure Cosmos DB for PostgreSQL.

Installare la libreria PostgreSQL

Gli esempi di codice in questo articolo richiedono la libreria pg per l'interfaccia con il server PostgreSQL. È necessario installare pg con gestione pacchetti di linguaggio, ad esempio npm.

Connettersi, creare una tabella e inserire dati

Creare il modulo di connessione comune

Suggerimento

Il codice di esempio seguente usa un pool di connessioni per creare e gestire le connessioni a PostgreSQL. Il pool di connessioni sul lato applicazione è fortemente consigliato perché:

  • Garantisce che l'applicazione non generi troppe connessioni al database e quindi evita di superare i limiti di connessione.
  • Può contribuire a migliorare drasticamente la latenza e la velocità effettiva. Il processo del server PostgreSQL deve creare una copia tramite fork per gestire ogni nuova connessione. Riutilizzare una connessione evita tale sovraccarico.

Creare una cartella denominata dbe all'interno di questa cartella creare un file citus.js contenente il codice di connessione comune seguente. In questo codice sostituire <cluster> con il nome del cluster e <password> con la password amministratore.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Crea una tabella

Usare il codice seguente per connettersi e caricare i dati usando le istruzioni SQL CREATE TABLE e INSERT INTO. Il codice crea una nuova tabella pharmacy e inserisce alcuni dati di esempio.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribuire le tabelle

Azure Cosmos DB for PostgreSQL offre la super potenza di distribuzione delle tabelle tra più nodi per la scalabilità. Il comando seguente consente di distribuire una tabella. Per altre informazioni su create_distributed_table e sulla colonna di distribuzione, vedere qui.

Nota

La distribuzione delle tabelle consente di aumentare le dimensioni in tutti i nodi di lavoro aggiunti al cluster.

Usare il codice seguente per connettersi al database e distribuire la tabella.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Leggere i dati

Usare il codice seguente per connettersi e leggere i dati usando un'istruzione SQL SELECT.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Aggiornamento dei dati

Usare il codice seguente per connettersi e aggiornare i dati usando un'istruzione SQL UPDATE.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Eliminare dati

Usare il codice seguente per connettersi ed eliminare i dati usando un'istruzione SQL DELETE.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Comando COPY per l'inserimento rapido

Il comando COPY può produrre una velocità effettiva elevata durante l'inserimento di dati in Azure Cosmos DB for PostgreSQL. Il comando COPY può inserire dati in file o da micro batch di dati in memoria per l'inserimento in tempo reale.

Comando COPY per caricare dati da un file

Il codice seguente copia i dati da un file CSV a una tabella di database. Il codice richiede il pacchetto pg-copy-streams e il file pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Comando COPY per caricare i dati in memoria

Il codice seguente copia i dati in memoria in una tabella. Il codice richiede il pacchetto through2, che consente il concatenamento di pipe.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Retry dell'app per gli errori delle richieste di database

A volte è possibile che le richieste di database dall'applicazione non riescano. Tali problemi possono verificarsi in scenari diversi, ad esempio errori di rete tra app e database, password non corretta e così via. Alcuni problemi possono essere temporanei e risolversi in pochi secondi o pochi minuti. È possibile configurare la logica di retry nell'app per risolvere gli errori temporanei.

La configurazione della logica di retry nell'app consente di migliorare l'esperienza dell'utente finale. In scenari di errore, gli utenti attenderanno semplicemente un po ' più a lungo per consentire all'applicazione di gestire le richieste, anziché riscontrare errori.

L'esempio seguente illustra come implementare la logica di retry nell'app. Il frammento di codice di esempio tenta una richiesta di database ogni 60 secondi (fino a cinque volte) fino a quando non riesce. Il numero e la frequenza dei tentativi possono essere configurati in base alle esigenze dell'applicazione.

In questo codice sostituire <cluster> con il nome del cluster e <password> con la password amministratore.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Passaggi successivi