Share via


PostgreSQL için Azure Cosmos DB'de SQL komutlarını bağlamak ve çalıştırmak için Node.js kullanın

ŞUNLAR IÇIN GEÇERLIDIR: PostgreSQL için Azure Cosmos DB (PostgreSQL'e citus veritabanı uzantısı tarafından desteklenir)

Bu hızlı başlangıçta bir kümeye bağlanmak için Node.js kodu kullanmayı ve tablo oluşturmak için SQL deyimlerini kullanmayı öğreneceksiniz. Ardından veritabanındaki verileri ekler, sorgular, güncelleştirir ve silersiniz. Bu makaledeki adımlarda, Node.js geliştirme hakkında bilgi sahibi olduğunuz ve PostgreSQL için Azure Cosmos DB ile çalışmaya yeni olduğunuz varsayılır.

PostgreSQL kitaplığını yükleme

Bu makaledeki kod örnekleri için pg kitaplığının PostgreSQL sunucusuyla arabirim oluşturması gerekir. Pg'yi dil paketi yöneticinizle (npm gibi) yüklemeniz gerekir.

Bağlanma, tablo oluşturma ve veri ekleme

Ortak bağlantı modülünü oluşturma

İpucu

Aşağıdaki örnek kod, PostgreSQL bağlantılarını oluşturmak ve yönetmek için bir bağlantı havuzu kullanır. Uygulama tarafı bağlantı havuzu kesinlikle önerilir çünkü:

  • Uygulamanın veritabanına çok fazla bağlantı oluşturmamasını sağlar ve bu nedenle bağlantı sınırlarının aşılmasını önler.
  • Hem gecikme süresi hem de aktarım hızı gibi performansı önemli ölçüde iyileştirmeye yardımcı olabilir. PostgreSQL sunucu işleminin her yeni bağlantıyı işlemek için çatal oluşturması gerekir ve bir bağlantıyı yeniden kullanmak bu ek yükü önler.

db adlı bir klasör oluşturun ve bu klasörün içinde aşağıdaki ortak bağlantı kodunu içeren bir citus.js dosyası oluşturun. Bu kodda, kümeyi küme adınızla ve <parolanızla> yönetici parolanızla değiştirin<.>

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Bir tablo oluşturma

CREATE TABLE ve INSERT INTO SQL deyimlerini kullanarak bağlanmak ve verileri yüklemek için aşağıdaki kodu kullanın. Kod yeni pharmacy bir tablo oluşturur ve bazı örnek veriler ekler.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Tabloları dağıtma

PostgreSQL için Azure Cosmos DB, ölçeklenebilirlik için tabloları birden çok düğüme dağıtmanın süper gücünü sağlar. Aşağıdaki komut bir tabloyu dağıtmanızı sağlar. Dağıtım sütunu ve hakkında create_distributed_table daha fazla bilgiyi burada bulabilirsiniz.

Not

Tabloların dağıtılması, kümeye eklenen çalışan düğümleri arasında büyümelerine olanak tanır.

Veritabanına bağlanmak ve tabloyu dağıtmak için aşağıdaki kodu kullanın.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Verileri okuma

Bağlanmak ve SELECT SQL deyimi kullanarak verileri okumak için aşağıdaki kodu kullanın.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Verileri güncelleştirme

Bağlanmak ve bir UPDATE SQL deyimi kullanarak verileri güncelleştirmek için aşağıdaki kodu kullanın.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Verileri silme

DELETE SQL deyimini kullanarak bağlanmak ve verileri silmek için aşağıdaki kodu kullanın.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Hızlı alma için COPY komutu

COPY komutu, PostgreSQL için Azure Cosmos DB'ye veri alırken muazzam aktarım hızı sağlayabilir. COPY komutu, gerçek zamanlı veri alımı için dosyalara veya bellekteki mikro veri toplu işlemlerinden veri alabilir.

Bir dosyadan veri yüklemek için COPY komutu

Aşağıdaki kod bir CSV dosyasındaki verileri veritabanı tablosuna kopyalar. Kod için pg-copy-streams paketi ve dosyasının pharmacies.csvgerekir.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

Bellek içi verileri yüklemek için COPY komutu

Aşağıdaki kod bellek içi verileri bir tabloya kopyalar. Kod, kanal zincirine izin veren through2 paketini gerektirir.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Veritabanı isteği hataları için uygulama yeniden deneme

Bazen uygulamanızdan gelen veritabanı istekleri başarısız olabilir. Bu tür sorunlar, uygulama ve veritabanı arasındaki ağ hatası, yanlış parola vb. gibi farklı senaryolarda oluşabilir. Bazı sorunlar geçici olabilir ve birkaç saniye ile dakika arasında çözülebilir. Geçici hataların üstesinden gelmek için uygulamanızda yeniden deneme mantığını yapılandırabilirsiniz.

Uygulamanızda yeniden deneme mantığını yapılandırmak, son kullanıcı deneyimini geliştirmeye yardımcı olur. Hata senaryolarında kullanıcılar, hatalarla karşılaşmak yerine uygulamanın isteklere hizmet vermesini yalnızca biraz daha bekler.

Aşağıdaki örnekte, uygulamanızda yeniden deneme mantığının nasıl uygulandığı gösterilmektedir. Örnek kod parçacığı, başarılı olana kadar her 60 saniyede bir (en fazla beş kez) bir veritabanı isteği dener. Yeniden deneme sayısı ve sıklığı, uygulamanızın gereksinimlerine göre yapılandırılabilir.

Bu kodda, kümeyi küme adınızla ve <parolanızla> yönetici parolanızla değiştirin<.>

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Sonraki adımlar