Uso de Node.js para conectarse y ejecutar comandos SQL en Azure Cosmos DB for PostgreSQL

SE APLICA A: Azure Cosmos DB for PostgreSQL (con tecnología de la extensión de base de datos de Citus en PostgreSQL)

En este inicio rápido se muestra cómo usar el código de Node.js para conectarse a un clúster y usar instrucciones SQL para crear una tabla. A continuación, insertará, consultará, actualizará y eliminará datos de la base de datos. En los pasos de este artículo se da por hecho que está familiarizado con el desarrollo de Node.js, pero que nunca ha trabajado con Azure Cosmos DB for PostgreSQL.

Instalación de la biblioteca de PostgreSQL

Los ejemplos de código de este artículo necesitan que la biblioteca pg interactúe con el servidor PostgreSQL. Deberá instalar pg con el administrador de paquetes de idioma (por ejemplo, npm).

Conexión, creación de una tabla e inserción de datos

Creación del módulo de conexión común

Sugerencia

El siguiente código de ejemplo usa un grupo de conexiones para crear y administrar las conexiones a PostgreSQL. Se recomienda encarecidamente la agrupación de conexiones en el lado de la aplicación porque:

  • Garantiza que la aplicación no genere demasiadas conexiones a la base de datos, lo que evita que se superen los límites de conexiones.
  • Puede ayudar a mejorar drásticamente el rendimiento, tanto la latencia como el procesamiento. El proceso del servidor PostgreSQL debe bifurcarse para controlar cada nueva conexión y reutilizar una conexión evita esa sobrecarga.

Cree una carpeta denominada db y, dentro de ella, cree un archivo citus.js que contenga el siguiente código de conexión común. En este código, reemplace el <clúster> por el nombre del clúster y la <contraseña> por la contraseña de administrador.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Creación de una tabla

Use el código siguiente para conectarse y cargar los datos mediante las instrucciones SQL CREATE TABLE e INSERT INTO. El código crea una tabla pharmacy e inserta algunos datos de ejemplo.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribución de tablas

Azure Cosmos DB for PostgreSQL le proporciona la habilidad de distribuir tablas entre varios nodos para mejorar la escalabilidad. El uso del siguiente comando le permitirá distribuir una tabla. Puede obtener más información sobre create_distributed_table y la columna de distribución aquí.

Nota

La distribución de tablas les permite crecer en todos los nodos de trabajo que se han agregado al clúster.

Use el siguiente código para conectarse a la base de datos y distribuir la tabla.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Lectura de datos

Use el código siguiente para conectarse y leer los datos mediante la instrucción SQL SELECT.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Actualización de datos

Use el código siguiente para conectarse y actualizar los datos mediante la instrucción SQL UPDATE.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Eliminación de datos

Use el código siguiente para conectarse y leer los datos mediante la instrucción SQL DELETE.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Comando COPY para llevar a cabo una ingesta rápida

El comando COPY es capaz de conseguir un rendimiento enorme cuando ingiere datos en Azure Cosmos DB for PostgreSQL. El comando COPY puede ingerir datos ubicados en archivos o microprocesos de datos ubicados en memoria durante un proceso de ingesta en tiempo real.

Uso del comando COPY para cargar datos desde un archivo

El código siguiente copia datos de un archivo .csv a una tabla de base de datos. El código requiere el paquete pg-copy-streams y el archivo pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Comando COPY para cargar datos almacenados en memoria

El código siguiente copia en una tabla los datos almacenados en memoria. El código requiere el paquete through2, que permite el encadenamiento de canalizaciones.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Reintento de la aplicación para errores de solicitud de la base de datos

A veces es posible que las solicitudes de base de datos de la aplicación produzcan un error. Estos problemas pueden producirse en diferentes escenarios, como errores de red entre la aplicación y la base de datos, contraseñas incorrectas, etc. Algunos problemas pueden ser transitorios y resolverse por sí mismos en unos segundos o minutos. Puede configurar la lógica de reintento en la aplicación para resolver los errores transitorios.

Configurar la lógica de reintento en la aplicación ayuda a mejorar la experiencia del usuario final. En escenarios de error, los usuarios simplemente esperarán un poco más para que la aplicación atienda las solicitudes, en lugar de experimentar errores.

En el ejemplo siguiente se muestra cómo implementar la lógica de reintento en la aplicación. El fragmento de código de ejemplo intenta una solicitud de base de datos cada 60 segundos (hasta cinco veces) hasta que se realiza correctamente. El número y la frecuencia de los reintentos se pueden configurar en función de las necesidades de la aplicación.

En este código, reemplace el <clúster> por el nombre del clúster y la <contraseña> por la contraseña de administrador.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Pasos siguientes