Поделиться через


Использование Node.js для подключения и запуска команд SQL в Azure Cosmos DB для PostgreSQL

Область применения: Azure Cosmos DB для PostgreSQL (на базе расширения базы данных Citus до PostgreSQL)

В этом кратком руководстве показано, как использовать Node.js код для подключения к кластеру и использовать инструкции SQL для создания таблицы. Затем вы вставляете, запрашиваете, обновляете и удаляете данные в базе данных. В этой статье предполагается, что вы знакомы с Node.js разработке и не знакомы с Azure Cosmos DB для PostgreSQL.

Установка библиотеки PostgreSQL

Примеры кода, приведенные в этой статье, требуют , чтобы библиотека pg была интерфейсна с сервером PostgreSQL. Вам потребуется установить pg с помощью диспетчера языковых пакетов (например, npm).

Подключение, создание таблицы и вставка данных

Создание общего модуля подключения

Совет

В приведенном ниже примере кода используется пул подключений для создания подключений к PostgreSQL и управления ими. Настоятельно рекомендуется использовать пул подключений на стороне приложения, так как:

  • Это гарантирует, что приложение не будет создавать слишком много подключений к базе данных, поэтому можно будет избежать превышения ограничений на подключения.
  • Это может значительно повысить производительность — как с точки зрения задержки, так и с точки зрения пропускной способности. Процесс сервера PostgreSQL должен разделиться, чтобы обработать каждое новое подключение, а повторное использование подключения позволяет избежать этих издержек.

Создайте папку с именем db и в этой папке создайте файл citus.js , содержащий следующий общий код подключения. В этом коде замените <кластер> именем кластера и <паролем паролем> администратора.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Создание таблицы

Используйте приведенный ниже код для подключения и загрузки данных с помощью инструкций SQL CREATE TABLE и INSERT INTO. Код создает новую pharmacy таблицу и вставляет некоторые примеры данных.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Распределение таблиц

Azure Cosmos DB для PostgreSQL предоставляет супер возможности распределения таблиц между несколькими узлами для масштабируемости. С помощью приведенной ниже команды можно распределить таблицу. Дополнительные сведения о create_distributed_table и столбце распределения см. здесь.

Примечание.

Распределение таблиц позволяет им увеличиваться по всем рабочим узлам, добавленным в кластер.

Используйте следующий код для подключения к базе данных и распространения таблицы:

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Чтение данных

Используйте указанный ниже код с инструкцией SQL SELECT для подключения и чтения данных.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Обновление данных

Используйте указанный ниже код с инструкцией SQL UPDATE для подключения и обновления данных.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Удаление данных

Используйте указанный ниже код с инструкцией SQL DELETE для подключения и удаления данных.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Команда COPY для быстрого приема

Команда COPY может обеспечить огромную пропускную способность при приеме данных в Azure Cosmos DB для PostgreSQL. Команда COPY может принимать данные в файлах или из микропакетов данных в памяти для приема в реальном времени.

Команда COPY для загрузки данных из файла

Следующий код копирует данные из CSV-файла в таблицу базы данных. Для кода требуется пакет pg-copy-streams и файл pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Команда COPY для загрузки данных в памяти

Следующий код копирует данные в память в таблицу. Для кода требуется пакет через2 , который позволяет выполнять цепочку каналов.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Повторные попытки приложения для сбоев запросов к базе данных

Иногда запросы к базе данных из приложения могут завершаться ошибкой. Такие проблемы могут возникать при различных сценариях, включая сбой сети между приложением и базой данных, неправильный пароль и т. д. Некоторые проблемы могут быть временными и самостоятельно устраняться в течение нескольких секунд или минут. Для устранения временных ошибок можно настроить в приложении логику повторных попыток.

Настройка логики повторных попыток в приложении помогает улучшить взаимодействие с пользователем. В сценариях сбоя пользователи просто будут немного дольше ждать, пока приложение обслужит запросы, а не столкнутся с ошибками.

В приведенном ниже примере показано, как реализовать логику повторных попыток в вашем приложении. Фрагмент примера кода пытается выполнить запрос к базе данных каждые 60 секунд (до пяти раз) до достижения требуемого результата. Количество и частоту повторных попыток можно настроить в зависимости от требований вашего приложения.

В этом коде замените <кластер> именем кластера и <паролем паролем> администратора.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Следующие шаги