중요합니다
Azure Cosmos DB for PostgreSQL은 더 이상 새 프로젝트에 지원되지 않습니다. 새 프로젝트에는 이 서비스를 사용하지 마세요. 대신 다음 두 서비스 중 하나를 사용합니다.
99.999% SLA(가용성 서비스 수준 약정), 인스턴트 자동 크기 조정 및 여러 지역에서 자동 장애 조치(failover)를 사용하는 대규모 시나리오용으로 설계된 분산 데이터베이스 솔루션에는 NoSQL용 Azure Cosmos DB를 사용합니다.
오픈 소스 Citus 확장을 사용하여 분할된 PostgreSQL용 Azure Database for PostgreSQL의 탄력적 클러스터 기능을 사용합니다.
이 빠른 시작에서는 Python 코드를 사용하여 클러스터에 연결하고 SQL 문을 사용하여 테이블을 만드는 방법을 보여 줍니다. 그런 다음, 데이터베이스에서 데이터를 삽입, 쿼리, 업데이트 및 삭제합니다. 이 문서의 단계에서는 사용자가 Python 개발에 익숙하고 Azure Cosmos DB for PostgreSQL을 처음 사용한다고 가정합니다.
PostgreSQL 라이브러리 설치
이 문서의 코드 예에는 psycopg2 라이브러리가 필요합니다. 언어 패키지 관리자(예: pip)와 함께 psycopg2를 설치해야 합니다.
연결, 테이블 만들기 및 데이터 삽입
다음 코드 예제에서는 Postgres 데이터베이스에 대한 연결 풀을 만듭니다. 그런 다음 SQL CREATE TABLE 및 INSERT INTO 문과 함께 cursor.execute 함수를 사용하여 테이블을 만들고 데이터를 삽입합니다.
팁
아래 샘플 코드는 연결 풀을 사용하여 PostgreSQL에 대한 연결을 만들고 관리합니다. 애플리케이션 측 연결 풀링은 다음과 같은 이유로 강력히 권장됩니다.
- 애플리케이션이 데이터베이스에 너무 많은 연결을 생성하지 않도록 하여 연결 제한을 초과하지 않도록 합니다.
- 대기 시간과 처리량 모두에서 성능을 크게 개선시키는 데 도움이 될 수 있습니다. PostgreSQL 서버 프로세스는 각각의 새로운 연결을 처리하기 위해 분기해야 하며 연결을 재사용하면 이러한 오버헤드를 피할 수 있습니다.
다음 코드에서 클러스터<를 클러스터 이름 및 >암호<로 관리자 암호 또는 Microsoft Entra ID 토큰으로 바꿉>니다.
참고 항목
이 예제에서는 마지막에 연결을 닫으므로 동일한 세션의 아티클에서 다른 샘플을 실행하려는 경우 이 샘플을 실행할 때 # Clean up 섹션을 포함하지 마세요.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
코드가 성공적으로 실행되면 다음과 같은 출력이 생성됩니다.
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
테이블 배포
Azure Cosmos DB for PostgreSQL은 확장성을 위해 여러 노드에 걸쳐 테이블을 분산하는 강력한 기능을 제공합니다. 아래 명령을 사용하면 테이블을 배포할 수 있습니다.
create_distributed_table에서 및 배포 열에 대해 자세히 알아볼 수 있습니다.
참고 항목
테이블을 분산하면 클러스터에 추가된 모든 작업자 노드에서 확장할 수 있습니다.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
데이터 읽기
다음 코드 예제에서는 다음 API를 사용하여 데이터베이스에서 데이터를 읽습니다.
- SQL SELECT에서 cursor.execute를 사용하여 데이터를 읽습니다.
- cursor.fetchall()은 쿼리를 수락하고 반복할 결과 집합을 반환합니다.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
데이터 업데이트
다음 코드 예제에서는 SQL UPDATE 문에서 cursor.execute를 사용하여 데이터를 업데이트합니다.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
데이터 삭제
다음 코드 예제에서는 SQL DELETE 문에서 cursor.execute를 실행하여 데이터를 삭제합니다.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
빠른 수집을 위한 COPY 명령
COPY 명령은 데이터를 Azure Cosmos DB for PostgreSQL로 수집하는 동안 엄청난 처리량을 낼 수 있습니다. COPY 명령은 실시간 수집을 위해 파일의 데이터 또는 메모리에 있는 데이터의 마이크로 일괄 처리에서 데이터를 수집할 수 있습니다.
파일에서 데이터를 로드하는 COPY 명령
다음 코드는 CSV 파일에서 데이터베이스 테이블로 데이터를 복사합니다. 코드에 pharmacies.csv 파일이 필요합니다.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
메모리 내 데이터를 로드하는 COPY 명령
다음 코드는 메모리 내 데이터를 테이블에 복사합니다.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
데이터베이스 요청 실패에 대한 앱 다시 시도
애플리케이션의 데이터베이스 요청이 실패하는 경우가 있습니다. 이러한 문제는 앱과 데이터베이스 간의 네트워크 오류, 잘못된 암호 등과 같은 다양한 시나리오에서 발생할 수 있습니다. 일부 문제는 일시적일 수 있으며 몇 초에서 몇 분 안에 자체적으로 해결됩니다. 일시적인 오류를 극복하도록 앱에서 다시 시도 논리를 구성할 수 있습니다.
앱에서 다시 시도 논리를 구성하면 최종 사용자 환경을 향상시키는 데 도움이 됩니다. 오류 시나리오에서 사용자는 오류가 발생하지 않고 애플리케이션에서 요청을 처리할 때까지 조금 더 기다리게 됩니다.
아래 예제에서는 앱에서 다시 시도 논리를 구현하는 방법을 보여 줍니다. 이 샘플 코드 조각은 성공할 때까지 60초마다(최대 5회) 데이터베이스 요청을 시도합니다. 다시 시도 횟수 및 빈도는 애플리케이션의 요구 사항에 따라 구성할 수 있습니다.
이 코드에서 <cluster>를 클러스터 이름으로, <password>를 관리자 암호로 바꿉니다.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
다음 단계
- Azure Cosmos DB for PostgreSQL API가 어떻게 PostgreSQL을 확장하는지 알아보고 유용한 진단 쿼리를 사용해 보세요.
- 워크로드에 가장 적합한 클러스터 크기 선택
- 클러스터 성능 모니터링