Usar o Node.js para se conectar e executar comandos SQL no Azure Cosmos DB for PostgreSQL

APLICA-SE A: Azure Cosmos DB for PostgreSQL (da plataforma da extensão de dados Citus para PostgreSQL)

Este início rápido mostra como usar o código Node.js para conectar um cluster e usar as instruções SQL para criar uma tabela. Em seguida, você irá inserir, consultar, atualizar e excluir dados no banco de dados. As etapas neste artigo pressupõem que você esteja familiarizado com o desenvolvimento usando Node.js e que tenha começado a trabalhar recentemente com o Azure Cosmos DB for PostgreSQL.

Instalar a biblioteca PostgreSQL

Os exemplos de código neste artigo exigem que a biblioteca pg faça interface com o servidor PostgreSQL. Será necessário instalar o pg com o gerenciador de pacotes de idiomas (como npm).

Conectar-se, criar uma tabela e inserir dados

Criar o módulo de conexão comum

Dica

O código de exemplo abaixo usa um pool de conexões para criar e gerenciar conexões com o PostgreSQL. O pool de conexões do lado do aplicativo é altamente recomendado porque:

  • Ele garante que o aplicativo não gere muitas conexões com o banco de dados e, portanto, evita exceder os limites de conexão.
  • Isso pode ajudar a aprimorar drasticamente o desempenho, tanto de latência quanto de taxa de transferência. O processo do servidor PostgreSQL precisa ser bifurcado para gerenciar cada nova conexão e reutilizar uma conexão evita essa sobrecarga.

Crie uma pasta chamada db e, dentro dessa pasta, crie um arquivo citus.js que contenha o código de conexão comum a seguir. No código, substitua <cluster> pelo nome do cluster e <senha> pela senha do administrador.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Criar uma tabela

Use o código a seguir para se conectar e carregar os dados usando as instruções SQL CREATE TABLE e INSERT INTO. O código cria uma nova tabela pharmacy e insere alguns dados de exemplo.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribuir tabelas

O Azure Cosmos DB for PostgreSQL oferece a superpotência da distribuição de tabelas entre vários nós, permitindo a escalabilidade. O comando abaixo permite a distribuição de uma tabela. Saiba mais sobre create_distributed_table e a coluna de distribuição aqui.

Observação

A distribuição de tabelas permite que elas cresçam em todos os nós de trabalho adicionados ao cluster.

Use o código a seguir para se conectar ao banco de dados e distribuir a tabela.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Ler dados

Use o código a seguir para conectar-se e ler os dados usando uma instrução SQL SELECT.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Atualizar dados

Use o código a seguir para conectar-se e atualizar os dados usando uma instrução SQL UPDATE.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Excluir dados

Use o código a seguir para conectar-se e excluir os dados usando uma instrução SQL DELETE.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Comando COPY para ingestão rápida

O comando COPY pode gerar uma taxa de transferência enorme ao ingerir dados no Azure Cosmos DB for PostgreSQL. O comando COPY pode ingerir dados em arquivos ou de microlotes de dados na memória para ingestão em tempo real.

Comando COPY para carregar dados de um arquivo

O código a seguir copia dados de um arquivo CSV para uma tabela de banco de dados. O código requer o pacote pg-copy-streams e o arquivo pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Comando COPY para carregar dados na memória

O código a seguir copia dados na memória em uma tabela. O código requer o pacote through2, que permite o encadeamento de pipe.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Repetição de aplicativo para falhas de solicitação de banco de dados

Às vezes, as solicitações de banco de dados do aplicativo podem falhar. Esses problemas podem ocorrer em diferentes cenários, como falha de rede entre o aplicativo e o banco de dados, senha incorreta etc. Alguns problemas podem ser transitórios e serem resolvidos em alguns segundos ou minutos. Você pode configurar a lógica de repetição no aplicativo para corrigir os erros transitórios.

A configuração da lógica de repetição no aplicativo ajuda a aprimorar a experiência do usuário final. Em cenários de falha, os usuários vão apenas esperar um pouco mais para que o aplicativo atenda às solicitações, em vez de receber erros.

O exemplo abaixo mostra como implementar a lógica de repetição no aplicativo. O snippet de código de exemplo tenta uma solicitação de banco de dados a cada 60 segundos (até cinco vezes) até que ela seja bem-sucedida. O número e a frequência de repetições podem ser configurados com base nas necessidades do aplicativo.

No código, substitua <cluster> pelo nome do cluster e <senha> pela senha do administrador.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Próximas etapas