다음을 통해 공유


Node.js를 사용하여 Azure Cosmos DB for PostgreSQL에서 SQL 명령 연결 및 실행

적용 대상: Azure Cosmos DB for PostgreSQL(PostgreSQL에 대한 Citus 데이터베이스 확장 기반)

이 빠른 시작에서는 Node.js 코드를 사용하여 클러스터에 연결하고 SQL 문을 사용하여 테이블을 만드는 방법을 보여 줍니다. 그런 다음, 데이터베이스에서 데이터를 삽입, 쿼리, 업데이트 및 삭제합니다. 이 문서의 단계에서는 사용자가 Node.js 개발에 익숙하고 Azure Cosmos DB for PostgreSQL을 처음 사용한다고 가정합니다.

PostgreSQL 라이브러리 설치

이 문서의 코드 예제에서는 PostgreSQL 서버와 인터페이스하는 pg 라이브러리가 필요합니다. 언어 패키지 관리자(예: npm)와 함께 pg를 설치해야 합니다.

연결, 테이블 만들기 및 데이터 삽입

공통 연결 모듈 만들기

아래 샘플 코드는 연결 풀을 사용하여 PostgreSQL에 대한 연결을 만들고 관리합니다. 애플리케이션 측 연결 풀링은 다음과 같은 이유로 강력히 권장됩니다.

  • 애플리케이션이 데이터베이스에 너무 많은 연결을 생성하지 않도록 하여 연결 제한을 초과하지 않도록 합니다.
  • 대기 시간과 처리량 모두에서 성능을 크게 개선시키는 데 도움이 될 수 있습니다. PostgreSQL 서버 프로세스는 각각의 새로운 연결을 처리하기 위해 분기해야 하며 연결을 재사용하면 이러한 오버헤드를 피할 수 있습니다.

db라는 폴더를 만들고 이 폴더 내에 다음과 같은 공통 연결 코드가 포함된 citus.js 파일을 만듭니다. 이 코드에서 <cluster>를 클러스터 이름으로, <password>를 관리자 암호로 바꿉니다.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

테이블 만들기

CREATE TABLE 및 INSERT INTO SQL 문을 사용하여 데이터를 연결하고 로드하려면 다음 코드를 사용하세요. 이 코드는 새 pharmacy 테이블을 만들고 일부 샘플 데이터를 삽입합니다.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

테이블 배포

Azure Cosmos DB for PostgreSQL은 확장성을 위해 여러 노드에 걸쳐 테이블을 분산하는 강력한 기능을 제공합니다. 아래 명령을 사용하면 테이블을 배포할 수 있습니다. 여기에서 create_distributed_table 및 배포 열에 대해 자세히 알아볼 수 있습니다.

참고 항목

테이블을 분산하면 클러스터에 추가된 모든 작업자 노드에서 확장할 수 있습니다.

다음 코드를 사용하여 데이터베이스에 연결하고 테이블을 배포합니다.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

데이터 읽기

SELECT SQL 문을 사용하여 데이터를 연결하고 읽으려면 다음 코드를 사용하세요.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

데이터 업데이트

UPDATE SQL 문을 사용하여 데이터를 연결하고 업데이트하려면 다음 코드를 사용하세요.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

데이터 삭제

DELETE SQL 문을 사용하여 데이터를 연결하고 삭제하려면 다음 코드를 사용하세요.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

빠른 수집을 위한 COPY 명령

COPY 명령은 데이터를 Azure Cosmos DB for PostgreSQL로 수집하는 동안 엄청난 처리량을 낼 수 있습니다. COPY 명령은 실시간 수집을 위해 파일의 데이터 또는 메모리에 있는 데이터의 마이크로 일괄 처리에서 데이터를 수집할 수 있습니다.

파일에서 데이터를 로드하는 COPY 명령

다음 코드는 CSV 파일에서 데이터베이스 테이블로 데이터를 복사합니다. 코드에는 pg-copy-streams 패키지 및 파일 pharmacies.csv가 필요합니다.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

메모리 내 데이터를 로드하는 COPY 명령

다음 코드는 메모리 내 데이터를 테이블에 복사합니다. 코드에는 파이프 체인을 허용하는 through2 패키지가 필요합니다.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

데이터베이스 요청 실패에 대한 앱 다시 시도

애플리케이션의 데이터베이스 요청이 실패하는 경우가 있습니다. 이러한 문제는 앱과 데이터베이스 간의 네트워크 오류, 잘못된 암호 등과 같은 다양한 시나리오에서 발생할 수 있습니다. 일부 문제는 일시적일 수 있으며 몇 초에서 몇 분 안에 자체적으로 해결됩니다. 일시적인 오류를 극복하도록 앱에서 다시 시도 논리를 구성할 수 있습니다.

앱에서 다시 시도 논리를 구성하면 최종 사용자 환경을 향상시키는 데 도움이 됩니다. 오류 시나리오에서 사용자는 오류가 발생하지 않고 애플리케이션에서 요청을 처리할 때까지 조금 더 기다리게 됩니다.

아래 예제에서는 앱에서 다시 시도 논리를 구현하는 방법을 보여 줍니다. 이 샘플 코드 조각은 성공할 때까지 60초마다(최대 5회) 데이터베이스 요청을 시도합니다. 다시 시도 횟수 및 빈도는 애플리케이션의 요구 사항에 따라 구성할 수 있습니다.

이 코드에서 <cluster>를 클러스터 이름으로, <password>를 관리자 암호로 바꿉니다.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

다음 단계