Verwenden von Node.js zum Herstellen einer Verbindung und zum Ausführen von SQL-Befehlen in Azure Cosmos DB for PostgreSQL

GILT FÜR: Azure Cosmos DB for PostgreSQL (unterstützt von der Citus-Datenbankerweiterung auf PostgreSQL)

In dieser Schnellstartanleitung erfahren Sie, wie Sie mithilfe von Node.js-Code eine Verbindung mit einem Cluster herstellen und SQL-Anweisungen zum Erstellen einer Tabelle verwenden. Anschließend werden Sie Daten in die Datenbank einfügen, abfragen, aktualisieren und löschen. Bei den Schritten in diesem Artikel wird davon vorausgesetzt, dass Sie mit der Entwicklung in Node.js vertraut sind. Es ist jedoch keine Erfahrung im Umgang mit Azure Cosmos DB for PostgreSQL erforderlich.

Installieren der PostgreSQL-Bibliothek

Für die Codebeispiele in diesem Artikel ist die pg-Bibliothek zur Verbindung mit dem PostgreSQL-Server erforderlich. Sie müssen pg mit Ihrem Sprachpaket-Manager (z. B. npm) installieren.

Herstellen einer Verbindung, Erstellen einer Tabelle und Einfügen von Daten

Erstellen des allgemeinen Verbindungsmoduls

Tipp

Der folgende Beispielcode verwendet einen Verbindungspool, um Verbindungen zu PostgreSQL zu erstellen und zu verwalten. Anwendungsseitiges Verbindungspooling wird dringend empfohlen, weil:

  • Es stellt sicher, dass die Anwendung nicht zu viele Verbindungen zur Datenbank herstellt und somit ein Überschreiten der Verbindungslimits vermieden wird.
  • Sie kann dazu beitragen, die Leistung drastisch zu verbessern - sowohl die Latenzzeit als auch den Durchsatz. Der PostgreSQL-Serverprozess muss freihand geben, um jede neue Verbindung zu behandeln und eine Verbindung wiederzuverwenden, um diesen Aufwand zu vermeiden.

Erstellen Sie einen Ordner namens db und in diesem Ordner eine Datei namens citus.js, die den folgenden allgemeinen Verbindungscode enthält. Ersetzen Sie in diesem Code <cluster> durch den Namen Ihres Clusters und <password> durch Ihr Administratorkennwort.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Erstellen einer Tabelle

Verwenden Sie den folgenden Code, um eine Verbindung herzustellen und die Daten zu laden, indem Sie die SQL-Anweisungen CREATE TABLE und INSERT INTO verwenden. Der Code erstellt eine neue Tabelle vom Typ pharmacy und fügt einige Beispieldaten ein.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Verteilen von Tabellen

Mit Azure Cosmos DB for PostgreSQL können Sie Tabellen knotenübergreifend verteilen, um Skalierbarkeit zu ermöglichen. Mit dem folgenden Befehl können Sie eine Tabelle verteilen. Weitere Informationen zu create_distributed_table und zur Verteilungsspalte finden Sie hier.

Hinweis

Verteilte Tabellen werden auf alle Workerknoten platziert, die dem Cluster hinzugefügt werden.

Verwenden Sie den folgenden Code, um eine Verbindung mit der Datenbank herzustellen und die Tabelle zu verteilen.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Lesen von Daten

Verwenden Sie den folgenden Code, um die Daten mit einer SQL-Anweisung des Typs SELECT zu verbinden und zu lesen.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Aktualisieren von Daten

Verwenden Sie den folgenden Code, um die Daten mit einer SQL-Anweisung des Typs UPDATE zu verbinden und zu aktualisieren.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Löschen von Daten

Verwenden Sie den folgenden Code, um die Daten mit einer SQL-Anweisung des Typs DELETE zu verbinden und zu löschen.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

COPY-Befehl für eine schnelle Datenerfassung

Der COPY-Befehl kann bei der Erfassung von Daten in Azure Cosmos DB for PostgreSQL einen enormen Durchsatz erzielen. Der COPY-Befehl kann Daten in Dateien erfassen oder Mikrobatches von Daten im Arbeitsspeicher in Echtzeit erfassen.

COPY-Befehl zum Laden von Daten aus einer Datei

Mit dem folgenden Code werden Daten aus einer CSV-Datei in eine Datenbanktabelle kopiert. Für den Code werden das Paket pg-copy-streams und die Datei pharmacies.csv benötigt.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

COPY-Befehl zum Laden von In-Memory-Daten

Mit dem folgenden Code werden In-Memory-Daten in eine Tabelle kopiert. Für den Code wird das Paket through2 benötigt, um die Verkettung von Pipes zu ermöglichen.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

App-Wiederholung bei Datenbankanforderungsfehlern

Manchmal passiert es, dass Datenbankanforderungen an Ihre Anwendung fehlschlagen. Solche Probleme können in verschiedenen Szenarien auftreten, z. B. bei Netzwerkfehlern zwischen App und Datenbank, bei einem falschen Kennwort usw. Manche Probleme sind temporär und lassen sich in wenigen Sekunden bis Minuten lösen. Sie können die Retry-Logik in Ihrer App konfigurieren, um die vorübergehenden Fehler zu bewältigen.

Durch das Konfigurieren der Retry-Logik in Ihrer App lässt sich die Benutzerfreundlichkeit für den Endbenutzers verbessern. In einem Fehlerszenario wartet der Benutzer lediglich etwas länger, bis die Anwendung Anforderungen ausgibt und keine Fehler mehr auftreten.

Im nachfolgenden Beispiel sehen Sie, wie Sie die Retry-Logik in Ihrer App implementieren. Der Beispielcodeschnipsel versucht es alle 60 Sekunden mit einer Datenbankanforderung (bis zu fünf Mal), bis er erfolgreich ist. Die Anzahl und Häufigkeit der Retries kann basierend auf den Anforderungen Ihrer Anwendung konfiguriert werden.

Ersetzen Sie in diesem Code <cluster> durch den Namen Ihres Clusters und <password> durch Ihr Administratorkennwort.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Nächste Schritte