Utilizar Node.js para ligar e executar comandos SQL no Azure Cosmos DB para PostgreSQL

APLICA-SE A: Azure Cosmos DB para PostgreSQL (com tecnologia da extensão da base de dados Citus para PostgreSQL)

Este início rápido mostra-lhe como utilizar Node.js código para ligar a um cluster e utilizar instruções SQL para criar uma tabela. Em seguida, irá inserir, consultar, atualizar e eliminar dados na base de dados. Os passos neste artigo partem do princípio de que está familiarizado com Node.js desenvolvimento e que não está familiarizado com o Azure Cosmos DB para PostgreSQL.

Instalar a biblioteca do PostgreSQL

Os exemplos de código neste artigo exigem que a biblioteca pg interaja com o servidor PostgreSQL. Terá de instalar o pg com o seu gestor de pacotes de idiomas (como npm).

Ligar, criar uma tabela e inserir dados

Criar o módulo de ligação comum

Dica

O código de exemplo abaixo utiliza um conjunto de ligações para criar e gerir ligações ao PostgreSQL. O agrupamento de ligações do lado da aplicação é altamente recomendado porque:

  • Garante que a aplicação não gera demasiadas ligações à base de dados, pelo que evita exceder os limites de ligação.
  • Pode ajudar a melhorar drasticamente o desempenho, tanto a latência como o débito. O processo do servidor PostgreSQL tem de fork para processar cada nova ligação e reutilizar uma ligação evita essa sobrecarga.

Crie uma pasta denominada db e, dentro desta pasta, crie um ficheiro citus.js que contenha o seguinte código de ligação comum. Neste código, substitua <o cluster> pelo nome do cluster e <palavra-passe pela palavra-passe> do administrador.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Criar uma tabela

Utilize o seguinte código para se ligar e carregar os dados com as instruções SQL CREATE TABLE e INSERT INTO. O código cria uma nova pharmacy tabela e insere alguns dados de exemplo.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribuir tabelas

O Azure Cosmos DB para PostgreSQL dá-lhe a super potência de distribuir tabelas em vários nós para escalabilidade. O comando abaixo permite-lhe distribuir uma tabela. Pode saber mais sobre create_distributed_table e a coluna de distribuição aqui.

Nota

Distribuir tabelas permite-lhes crescer em todos os nós de trabalho adicionados ao cluster.

Utilize o seguinte código para ligar à base de dados e distribuir a tabela.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Ler dados

Utilize o código seguinte para se ligar e ler dados com uma instrução SQL SELECT.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Atualizar dados

Utilize o código seguinte para se ligar e atualizar os dados com uma instrução SQL UPDATE.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Eliminar dados

Utilize o código seguinte para se ligar e eliminar os dados com a instrução SQL DELETE.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Comando COPY para ingestão rápida

O comando COPY pode gerar um débito tremendo ao ingerir dados no Azure Cosmos DB para PostgreSQL. O comando COPY pode ingerir dados em ficheiros ou a partir de micro-lotes de dados na memória para ingestão em tempo real.

Comando COPY para carregar dados de um ficheiro

O código seguinte copia dados de um ficheiro CSV para uma tabela de base de dados. O código requer o pacote pg-copy-streams e o ficheiro pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Comando COPY para carregar dados dentro da memória

O código seguinte copia dados dentro da memória para uma tabela. O código requer o pacote through2 , que permite o encadeamento de tubos.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Repetição de aplicações para falhas de pedidos de base de dados

Por vezes, é possível que os pedidos de base de dados da sua aplicação falhem. Estes problemas podem ocorrer em diferentes cenários, como a falha de rede entre a aplicação e a base de dados, palavra-passe incorreta, etc. Alguns problemas podem ser transitórios e resolver-se dentro de alguns segundos a minutos. Pode configurar a lógica de repetição na sua aplicação para ultrapassar os erros transitórios.

Configurar a lógica de repetição na sua aplicação ajuda a melhorar a experiência do utilizador final. Em cenários de falha, os utilizadores apenas aguardarão um pouco mais para que a aplicação sirva pedidos, em vez de se deparar com erros.

O exemplo abaixo mostra como implementar a lógica de repetição na sua aplicação. O fragmento de código de exemplo tenta um pedido de base de dados a cada 60 segundos (até cinco vezes) até ter êxito. O número e a frequência das repetições podem ser configurados com base nas necessidades da sua aplicação.

Neste código, substitua <o cluster> pelo nome do cluster e <palavra-passe pela palavra-passe> do administrador.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Passos seguintes