Utiliser Node.js pour se connecter et exécuter des commandes SQL sur Azure Cosmos DB for PostgreSQL

S’APPLIQUE À : Azure Cosmos DB for PostgreSQL (avec l’extension de base de données Citus pour PostgreSQL)

Ce guide de démarrage rapide vous montre comment utiliser du code Node.js pour vous connecter à un cluster et utiliser des instructions SQL pour créer une table. Ensuite, vous insérerez, interrogerez, mettrez à jour et supprimerez des données dans la base de données. Cet article suppose que vous connaissez les bases du développement Node.js, et que vous ne savez pas utiliser Azure Cosmos DB for PostgreSQL.

Installer la bibliothèque PostgreSQL

Les exemples de code de cet article nécessitent que la bibliothèque pg s’interface avec le serveur PostgreSQL. Vous devez installer pg avec votre gestionnaire de package de langage (par exemple npm).

Se connecter, créer une table et insérer des données

Créer le module de connexion commun

Conseil

L’exemple de code ci-dessous utilise un pool de connexions pour créer et gérer des connexions à PostgreSQL. Le regroupement de connexions côté application est fortement recommandé, car :

  • Elle garantit que l’application ne génère pas trop de connexions à la base de données, et évite ainsi de dépasser les limites de connexion.
  • Cela peut contribuer à améliorer considérablement les performances, à la fois la latence et le débit. Le processus serveur PostgreSQL doit être dupliqué pour gérer chaque nouvelle connexion et réutiliser une connexion, ce qui évite cette surcharge.

Créez un dossier nommé db, puis, dans ce dossier, créez un fichier citus.js contenant le code de connexion commun suivant. Dans ce code, remplacez <cluster> par le nom de votre cluster et <mot de passe> par votre mot de passe d’administration.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Créer une table

Utilisez le code suivant pour vous connecter et charger les données à l’aide d’instructions SQL CREATE TABLE et INSERT INTO. Le code crée une nouvelle table pharmacy et insère un échantillon de données.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribuer des tables

Azure Cosmos DB for PostgreSQL vous offre la super puissance de distribution de tables sur plusieurs nœuds à des fins de scalabilité. La commande ci-dessous vous permet de distribuer une table. Vous pouvez en savoir plus sur create_distributed_table et la colonne de distribution ici.

Notes

La distribution de tables leur permet de croître sur tous les nœuds Worker ajoutés au cluster.

Utilisez le code suivant pour vous connecter à la base de données et distribuer la table.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Lire les données

Utilisez le code suivant pour vous connecter et lire des données à l’aide d’une instruction SQL SELECT.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Mettre à jour des données

Utilisez le code suivant pour vous connecter et mettre à jour les données à l’aide d’une instruction SQL UPDATE.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Suppression de données

Utilisez le code suivant pour vous connecter et supprimer les données à l’aide d’une instruction SQL DELETE.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Commande COPY pour l’ingestion rapide

La commande COPY peut générer un débit considérable lors de l’ingestion de données dans Azure Cosmos DB for PostgreSQL. La commande COPY peut ingérer des données dans des fichiers ou à partir de micro-lots de données en mémoire pour l’ingestion en temps réel.

Commande COPY pour charger des données à partir d’un fichier

Le code suivant copie des données à partir d’un fichier CSV dans une table de base de données. Le code requiert le package pg-copy-streams et le fichier pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Commande COPY pour charger des données en mémoire

Le code suivant copie des données en mémoire dans une table. Le code requiert le package through2 qui permet le chaînage de canaux.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Nouvelle tentative d’application pour les échecs de requête de base de données

Il arrive parfois que les requêtes de base de données en provenance de votre application échouent. Ces problèmes peuvent se produire dans différents scénarios, comme une défaillance réseau entre une application et une base de données, un mot de passe incorrect, etc. Certains problèmes peuvent être temporaires et se résoudre d’eux-mêmes en quelques secondes ou quelques minutes. Vous pouvez configurer la logique de nouvelle tentative dans votre application pour surmonter les erreurs temporaires.

La configuration de la logique de nouvelle tentative dans votre application permet d’améliorer l’expérience de l’utilisateur final. Dans les scénarios de défaillance, les utilisateurs attendent simplement un peu plus longtemps que l’application traite les requêtes, au lieu d’être confrontés à des erreurs.

L’exemple ci-dessous montre comment implémenter une logique de nouvelle tentative dans votre application. L’exemple d’extrait de code tente une requête de base de données toutes les 60 secondes (jusqu’à cinq fois) jusqu’à ce qu’elle réussisse. Le nombre et la fréquence des nouvelles tentatives peuvent être configurés en fonction des besoins de votre application.

Dans ce code, remplacez <cluster> par le nom de votre cluster et <mot de passe> par votre mot de passe d’administration.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Étapes suivantes