استخدام Node.js للاتصال وتشغيل أوامر SQL على Azure Cosmos DB ل PostgreSQL

ينطبق على: Azure Cosmos DB ل PostgreSQL (مدعوم بملحق قاعدة بيانات Citus إلى PostgreSQL)

يوضح لك هذا التشغيل السريع كيفية استخدام التعليمات البرمجية Node.js للاتصال بمجموعة، واستخدام عبارات SQL لإنشاء جدول. ستقوم بعد ذلك بإدراج البيانات والاستعلام عنها وتحديثها وحذفها في قاعدة البيانات. تفترض الخطوات الواردة في هذه المقالة أنك على دراية بتطوير Node.js، وأنك جديد في العمل مع Azure Cosmos DB ل PostgreSQL.

تثبيت مكتبة PostgreSQL

تتطلب أمثلة التعليمات البرمجية في هذه المقالة مكتبة pg للواجهة مع خادم PostgreSQL. ستحتاج إلى تثبيت pg مع مدير حزمة اللغة (مثل npm).

الاتصال وإنشاء جدول وإدراج البيانات

إنشاء وحدة الاتصال الشائعة

تلميح

يستخدم نموذج التعليمات البرمجية أدناه مجموعة اتصال لإنشاء اتصالات إلى PostgreSQL وإدارتها. يوصى بشدة بمجموعة الاتصال من جانب التطبيق لأن:

  • ويضمن أن التطبيق لا ينشئ اتصالات كثيرة جدًا بقاعدة البيانات، وبالتالي يتجنب تجاوز حدود الاتصال.
  • يمكن أن يساعد على تحسين الأداء بشكل كبير - كل من زمن الانتقال ومعدل النقل. يجب أن تتشعب عملية خادم PostgreSQL لمعالجة كل اتصال جديد، وأن تتجنب إعادة استخدام الاتصال هذا الحمل.

أنشئ مجلدا يسمى db، وداخل هذا المجلد قم بإنشاء ملف citus.js يحتوي على التعليمات البرمجية للاتصال الشائعة التالية. في هذه التعليمة البرمجية، استبدل <نظام المجموعة> باسم نظام المجموعة وكلمة <المرور بكلمة مرور> المسؤول.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

إنشاء جدول

استخدم التعليمات البرمجية التالية للاتصال وتحميل البيانات باستخدام عبارات CREATE TABLE وINSERT INTO SQL. تنشئ التعليمات البرمجية جدولا جديدا pharmacy وتدرج بعض بيانات العينة.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

توزيع الجداول

يمنحك Azure Cosmos DB ل PostgreSQL القوة الفائقة لتوزيع الجداول عبر عقد متعددة لقابلية التوسع. يمكنك الأمر أدناه من توزيع جدول. يمكنك معرفة المزيد حول create_distributed_tableعمود التوزيع هنا.

إشعار

يتيح لهم توزيع الجداول النمو عبر أي عقد عاملة تمت إضافتها إلى نظام المجموعة.

استخدم التعليمات البرمجية التالية للاتصال بقاعدة البيانات وتوزيع الجدول.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

اقرأ البيانات

استخدم التعليمات البرمجية التالية للاتصال وقراءة البيانات باستخدام عبارة الـ SQL: SELECT.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

تحديث البيانات

استخدم التعليمات البرمجية التالية للاتصال وتحديث البيانات باستخدام عبارة UPDATE SQL.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

حذف البيانات

استخدم التعليمات البرمجية التالية للاتصال وحذف البيانات باستخدام عبارة DELETE SQL.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

أمر COPY للاستيعاب السريع

يمكن أن ينتج عن أمر COPY إنتاجية هائلة أثناء استيعاب البيانات في Azure Cosmos DB ل PostgreSQL. يمكن لأمر COPY استيعاب البيانات في الملفات، أو من دفعات صغيرة من البيانات في الذاكرة لاستيعابها في الوقت الحقيقي.

الأمر COPY لتحميل البيانات من ملف

تنسخ التعليمات البرمجية التالية البيانات من ملف CSV إلى جدول قاعدة بيانات. تتطلب التعليمات البرمجية حزمة pg-copy-streams والملف pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

الأمر COPY لتحميل البيانات في الذاكرة

تنسخ التعليمات البرمجية التالية البيانات الموجودة في الذاكرة إلى جدول. تتطلب التعليمات البرمجية حزمة through2 ، والتي تسمح بتسلسل الأنابيب.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

إعادة محاولة التطبيق لحالات فشل طلب قاعدة البيانات

من الممكن أحياناً أن تفشل طلبات قاعدة البيانات من التطبيق. يمكن أن تحدث مثل هذه المشكلات في ظل سيناريوهات مختلفة، مثل فشل الشبكة بين التطبيق وقاعدة البيانات، وكلمة المرور غير الصحيحة، وما إلى ذلك. قد تكون بعض المشكلات عابرة، وتحل نفسها في بضع ثوان إلى دقائق. يمكنك تكوين منطق إعادة المحاولة في تطبيقك للتغلب على الأخطاء العابرة.

يساعد تكوين منطق إعادة المحاولة في تطبيقك على تحسين تجربة المستخدم. في ظل سيناريوهات الفشل، سينتظر المستخدمون وقتاً أطول قليلاً حتى يقوم التطبيق بخدمة الطلبات، بدلاً من مواجهة الأخطاء.

يوضح المثال أدناه كيفية تنفيذ منطق إعادة المحاولة في تطبيقك. يحاول نموذج القصاصة البرمجية طلب قاعدة بيانات كل 60 ثانية (حتى خمس مرات) حتى ينجح. يمكن تكوين عدد مرات إعادة المحاولة وتكرارها استناداً إلى احتياجات التطبيق.

في هذه التعليمة البرمجية، استبدل <نظام المجموعة> باسم نظام المجموعة وكلمة <المرور بكلمة مرور> المسؤول.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

الخطوات التالية