Python을 사용하여 Azure Cosmos DB for PostgreSQL에서 SQL 명령 연결 및 실행

적용 대상: Azure Cosmos DB for PostgreSQL(PostgreSQL에 대한 Citus 데이터베이스 확장 기반)

이 빠른 시작에서는 Python 코드를 사용하여 클러스터에 연결하고 SQL 문을 사용하여 테이블을 만드는 방법을 보여 줍니다. 그런 다음, 데이터베이스에서 데이터를 삽입, 쿼리, 업데이트 및 삭제합니다. 이 문서의 단계에서는 사용자가 Python 개발에 익숙하고 Azure Cosmos DB for PostgreSQL을 처음 사용한다고 가정합니다.

PostgreSQL 라이브러리 설치

이 문서의 코드 예에는 psycopg2 라이브러리가 필요합니다. 언어 패키지 관리자(예: pip)와 함께 psycopg2를 설치해야 합니다.

연결, 테이블 만들기 및 데이터 삽입

다음 코드 예제에서는 Postgres 데이터베이스에 대한 연결 풀을 만듭니다. 그런 다음 SQL CREATE TABLE 및 INSERT INTO 문과 함께 cursor.execute 함수를 사용하여 테이블을 만들고 데이터를 삽입합니다.

아래 샘플 코드는 연결 풀을 사용하여 PostgreSQL에 대한 연결을 만들고 관리합니다. 애플리케이션 측 연결 풀링은 다음과 같은 이유로 강력히 권장됩니다.

  • 애플리케이션이 데이터베이스에 너무 많은 연결을 생성하지 않도록 하여 연결 제한을 초과하지 않도록 합니다.
  • 대기 시간과 처리량 모두에서 성능을 크게 개선시키는 데 도움이 될 수 있습니다. PostgreSQL 서버 프로세스는 각각의 새로운 연결을 처리하기 위해 분기해야 하며 연결을 재사용하면 이러한 오버헤드를 피할 수 있습니다.

다음 코드에서 <cluster>를 클러스터 이름으로 바꾸고 <password>를 관리자 암호로 바꿉니다.

참고 항목

이 예제에서는 마지막에 연결을 닫으므로 동일한 세션의 아티클에서 다른 샘플을 실행하려는 경우 이 샘플을 실행할 때 # Clean up 섹션을 포함하지 마세요.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

코드가 성공적으로 실행되면 다음과 같은 출력이 생성됩니다.

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

테이블 배포

Azure Cosmos DB for PostgreSQL은 확장성을 위해 여러 노드에 걸쳐 테이블을 분산하는 강력한 기능을 제공합니다. 아래 명령을 사용하면 테이블을 배포할 수 있습니다. 여기에서 create_distributed_table 및 배포 열에 대해 자세히 알아볼 수 있습니다.

참고 항목

테이블을 분산하면 클러스터에 추가된 모든 작업자 노드에서 확장할 수 있습니다.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

데이터 읽기

다음 코드 예제에서는 다음 API를 사용하여 데이터베이스에서 데이터를 읽습니다.

  • SQL SELECT에서 cursor.execute를 사용하여 데이터를 읽습니다.
  • cursor.fetchall()은 쿼리를 수락하고 반복할 결과 집합을 반환합니다.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

데이터 업데이트

다음 코드 예제에서는 SQL UPDATE 문에서 cursor.execute를 사용하여 데이터를 업데이트합니다.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

데이터 삭제

다음 코드 예제에서는 SQL DELETE 문에서 cursor.execute를 실행하여 데이터를 삭제합니다.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

빠른 수집을 위한 COPY 명령

COPY 명령은 데이터를 Azure Cosmos DB for PostgreSQL로 수집하는 동안 엄청난 처리량을 낼 수 있습니다. COPY 명령은 실시간 수집을 위해 파일의 데이터 또는 메모리에 있는 데이터의 마이크로 일괄 처리에서 데이터를 수집할 수 있습니다.

파일에서 데이터를 로드하는 COPY 명령

다음 코드는 CSV 파일에서 데이터베이스 테이블로 데이터를 복사합니다. 코드에 pharmacies.csv 파일이 필요합니다.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

메모리 내 데이터를 로드하는 COPY 명령

다음 코드는 메모리 내 데이터를 테이블에 복사합니다.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

데이터베이스 요청 실패에 대한 앱 다시 시도

애플리케이션의 데이터베이스 요청이 실패하는 경우가 있습니다. 이러한 문제는 앱과 데이터베이스 간의 네트워크 오류, 잘못된 암호 등과 같은 다양한 시나리오에서 발생할 수 있습니다. 일부 문제는 일시적일 수 있으며 몇 초에서 몇 분 안에 자체적으로 해결됩니다. 일시적인 오류를 극복하도록 앱에서 다시 시도 논리를 구성할 수 있습니다.

앱에서 다시 시도 논리를 구성하면 최종 사용자 환경을 향상시키는 데 도움이 됩니다. 오류 시나리오에서 사용자는 오류가 발생하지 않고 애플리케이션에서 요청을 처리할 때까지 조금 더 기다리게 됩니다.

아래 예제에서는 앱에서 다시 시도 논리를 구현하는 방법을 보여 줍니다. 이 샘플 코드 조각은 성공할 때까지 60초마다(최대 5회) 데이터베이스 요청을 시도합니다. 다시 시도 횟수 및 빈도는 애플리케이션의 요구 사항에 따라 구성할 수 있습니다.

이 코드에서 <cluster>를 클러스터 이름으로, <password>를 관리자 암호로 바꿉니다.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

다음 단계