Sdílet prostřednictvím


Použití Node.js k připojení a spuštění příkazů SQL ve službě Azure Cosmos DB for PostgreSQL

PLATÍ PRO: Azure Cosmos DB for PostgreSQL (využívající rozšíření databáze Citus do PostgreSQL)

V tomto rychlém startu se dozvíte, jak pomocí Node.js kódu připojit ke clusteru a pomocí příkazů SQL vytvořit tabulku. Potom vložíte data do databáze, budete je dotazovat, aktualizovat a odstraňovat. Kroky v tomto článku předpokládají, že znáte Node.js vývoj a začínáte pracovat se službou Azure Cosmos DB for PostgreSQL.

Instalace knihovny PostgreSQL

Příklady kódu v tomto článku vyžadují , aby knihovna pg pro rozhraní se serverem PostgreSQL. Pg budete muset nainstalovat pomocí správce jazykových balíčků (například npm).

Připojení, vytvoření tabulky a vložení dat

Vytvoření společného modulu připojení

Tip

Následující ukázkový kód používá fond připojení k vytvoření a správě připojení k PostgreSQL. Sdružování připojení na straně aplikace se důrazně doporučuje, protože:

  • Zajišťuje, že aplikace nevygeneruje příliš mnoho připojení k databázi, a proto se vyhne překročení limitů připojení.
  • Může výrazně zlepšit výkon – latenci i propustnost. Proces serveru PostgreSQL musí vytvořit fork pro zpracování každého nového připojení a opětovným využitím připojení se této režii vyhnout.

Vytvořte složku s názvem db a uvnitř této složky vytvořte soubor citus.js , který obsahuje následující společný kód připojení. V tomto kódu nahraďte <cluster> názvem clusteru a <heslem> správce.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Vytvoření tabulky

Pomocí následujícího kódu se připojte a nahrajte data s využitím příkazů CREATE TABLE a INSERT INTO jazyka SQL. Kód vytvoří novou pharmacy tabulku a vloží několik ukázkových dat.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribuce tabulek

Azure Cosmos DB for PostgreSQL poskytuje super výkon distribuce tabulek napříč několika uzly pro zajištění škálovatelnosti. Následující příkaz umožňuje distribuovat tabulku. Další informace o create_distributed_table distribučním sloupci najdete tady.

Poznámka:

Distribuce tabulek umožňuje růst mezi všechny pracovní uzly přidané do clusteru.

Pomocí následujícího kódu se připojte k databázi a distribuujte tabulku.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Čtení dat

Pomocí následujícího kódu se připojte a načtěte data s využitím příkazu SELECT jazyka SQL.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Aktualizace dat

Pomocí následujícího kódu se připojte a aktualizujte data s využitím příkazu UPDATE jazyka SQL.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Odstranění dat

Pomocí následujícího kódu se připojte a odstraňte data s využitím příkazu DELETE jazyka SQL.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Příkaz COPY pro rychlý příjem dat

Příkaz COPY může přinést obrovskou propustnost při příjmu dat do služby Azure Cosmos DB for PostgreSQL. Příkaz COPY může ingestovat data v souborech nebo z mikrodávek dat v paměti pro příjem dat v reálném čase.

Příkaz COPY pro načtení dat ze souboru

Následující kód zkopíruje data ze souboru CSV do databázové tabulky. Kód vyžaduje balíček pg-copy-streams a soubor pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Příkaz COPY pro načtení dat v paměti

Následující kód zkopíruje data v paměti do tabulky. Kód vyžaduje balíček až 2 , který umožňuje řetězení potrubí.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Opakování aplikace kvůli selhání žádostí o databázi

Někdy je možné, že databázové požadavky z vaší aplikace selžou. K takovým problémům může dojít v různých scénářích, jako je selhání sítě mezi aplikací a databází, nesprávné heslo atd. Některé problémy můžou být přechodné a během několika sekund až minut se vyřeší. Logiku opakování v aplikaci můžete nakonfigurovat tak, aby se překončily přechodné chyby.

Konfigurace logiky opakování ve vaší aplikaci pomáhá zlepšit uživatelské prostředí. Ve scénářích selhání uživatelé budou jen chvíli čekat, než aplikace bude obsluhovat požadavky, místo aby došlo k chybám.

Následující příklad ukazuje, jak implementovat logiku opakování ve vaší aplikaci. Ukázkový fragment kódu se pokusí o požadavek databáze každých 60 sekund (až pětkrát), dokud nebude úspěšný. Počet a četnost opakování je možné nakonfigurovat na základě potřeb vaší aplikace.

V tomto kódu nahraďte <cluster> názvem clusteru a <heslem> správce.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Další kroky