使用 Python 在 Azure Cosmos DB for PostgreSQL 上連線和執行 SQL 命令

適用於: Azure Cosmos DB for PostgreSQL (由 PostgreSQL 的 Citus 資料庫延伸模組提供)

此快速入門說明如何使用 Python 程式碼來連線到叢集,並使用 SQL 陳述式來建立資料表。 接著,您會在資料庫中插入、查詢、更新和刪除資料。 本文中的步驟假設您已熟悉 Python 開發,但為 Azure Cosmos DB for PostgreSQL 的新使用者。

安裝 PostgreSQL 程式庫

此文章中的程式碼範例需要 psycopg2 程式庫。 您必須使用自己的語言套件管理員 (例如 pip) 來安裝 psycopg2。

連線、建立資料表及插入資料

下列程式碼範例會建立 Postgres 資料庫的連線集區。 然後,其會搭配 SQL CREATE TABLE 和 INSERT INTO 陳述式使用 cursor.execute 函數,來建立資料表和插入資料。

提示

以下範例程式碼使用連線集區來建立及管理與 PostgreSQL 的連線。 我們強烈建議使用應用程式端連線共用,原因如下:

  • 可確保應用程式不會產生過多資料庫連線,進而避免超過連線限制。
  • 可協助大幅改善效能,包括延遲和輸送量。 PostgreSQL 伺服器處理序必須進行派生才能處理每個新連線,而重複使用連線可避免派生帶來的負擔。

在下列程式碼中,以您的叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。

注意

此範例會在結束時關閉連線,因此如果您想要在同一工作階段中執行文章中的其他範例,請勿在執行此範例時包含 # Clean up 區段。

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

程式碼會在成功執行後產生下列輸出:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

散發資料表

Azure Cosmos DB for PostgreSQL 可提供您在多個節點之間散發資料表的強大功能,以取得可擴縮性。 下列命令可讓您分散資料表。 您可以在這裡深入了解 create_distributed_table 和分散資料行。

注意

散發資料表可讓其在新增至叢集的任何背景工作節點上成長。

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

讀取資料

下列程式碼範例會使用下列 API 從資料庫讀取資料:

# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

更新資料

下列程式碼範例會使用 cursor.execute 搭配 SQL UPDATE 陳述式以更新資料。

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

刪除資料

下列程式碼範例使用 SQL DELETE 陳述式來執行 cursor.execute 以刪除資料。

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

適用於快速擷取的 COPY 命令

COPY 命令會在將資料內嵌至 Azure Cosmos DB for PostgreSQL 時產生極大的輸送量。 COPY 命令可以在檔案中擷取資料,或從記憶體中的微批次資料擷取以進行即時擷取。

用來從檔案載入資料的 COPY 命令

下列程式碼會將資料從 CSV 檔案複製到資料庫資料表。 此程式碼需要檔案 pharmacies.csv

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

用來載入記憶體內部資料的 COPY 命令

下列程式碼會將記憶體內部資料複製到資料表。

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

適用於資料庫要求失敗的應用程式重試

有時候,來自您應用程式的資料庫要求可能會失敗。 這類問題可能在不同的情況下發生,例如應用程式與資料庫之間的網路失敗、密碼不正確等。有些問題可能是暫時性的,而且會在幾秒到幾分鐘內自行解決。 您可以在應用程式中設定重試邏輯,以克服暫時性錯誤。

在應用程式中設定重試邏輯有助於改善使用者體驗。 在失敗案例中,使用者只會多等一些時間讓應用程式服務要求,而不會遇到錯誤。

下列範例示範如何在應用程式中實作重試邏輯。 範例程式碼片段會每隔 60 秒嘗試一次資料庫要求 (最多五次) 直到成功為止。 您可以根據應用程式的需求來設定重試次數和頻率。

在此程式碼中,以叢集名稱取代 <cluster>,並以管理員密碼取代 <password>。

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

下一步