Megosztás a következőn keresztül:


SQL-parancsok csatlakoztatása és futtatása a Python használatával az Azure Cosmos DB for PostgreSQL-ben

A KÖVETKEZŐKRE VONATKOZIK: Azure Cosmos DB for PostgreSQL (a Citus adatbázisbővítménye a PostgreSQL-re)

Ez a rövid útmutató bemutatja, hogyan csatlakozhat Python-kóddal egy fürthöz, és hogyan hozhat létre táblát SQL-utasításokkal. Ezután adatokat szúrhat be, kérdezhet le, frissíthet és törölhet az adatbázisban. A cikk lépései feltételezik, hogy ismeri a Python-fejlesztést, és még nem ismeri az Azure Cosmos DB for PostgreSQL használatát.

A PostgreSQL-kódtár telepítése

A cikkben szereplő kód példákhoz a psycopg2 kódtárra van szükség. Telepítenie kell a psycopg2-t a nyelvi csomagkezelővel (például pip).

Csatlakozás, tábla létrehozása és adatok beszúrása

Az alábbi példakód létrehoz egy kapcsolatkészletet a Postgres-adatbázishoz. Ezután a kurzor.execute függvényeket használja az SQL CREATE TABLE és az INSERT INTO utasításokkal egy tábla létrehozásához és adatok beszúrásához.

Tipp.

Az alábbi mintakód egy kapcsolatkészletet használ a PostgreSQL-kapcsolatok létrehozásához és kezeléséhez. Az alkalmazásoldali kapcsolatkészletezés erősen ajánlott, mert:

  • Biztosítja, hogy az alkalmazás ne hozzon létre túl sok kapcsolatot az adatbázishoz, így elkerülheti a kapcsolati korlátok túllépését.
  • Ez jelentősen javíthatja a teljesítményt – a késést és az átviteli sebességet is. A PostgreSQL-kiszolgáló folyamatának elágaztatást kell végeznie az egyes új kapcsolatok kezeléséhez, és a kapcsolat újrahasználása elkerüli ezt a többletterhelést.

Az alábbi kódban cserélje le <a fürtöt> a fürt nevére és <jelszavára> a rendszergazdai jelszóra.

Feljegyzés

Ez a példa lezárja a kapcsolatot a végén, ezért ha a cikkben szereplő többi mintát ugyanabban a munkamenetben szeretné futtatni, ne vegye fel a szakaszt a # Clean up minta futtatásakor.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Ha a kód sikeresen fut, a következő kimenetet hozza létre:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Táblák elosztása

Az Azure Cosmos DB for PostgreSQL lehetővé teszi a táblák több csomópont közötti elosztását a méretezhetőség érdekében. Az alábbi parancs lehetővé teszi egy tábla terjesztését. Erről és a terjesztési oszlopról create_distributed_table itt olvashat bővebben.

Feljegyzés

A táblák elosztása lehetővé teszi, hogy a fürthöz hozzáadott munkavégző csomópontok között növekedjenek.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Adatok beolvasása

Az alábbi példakód az alábbi API-k használatával olvassa be az adatokat az adatbázisból:

# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Adatok frissítése

Az alábbi példakód az SQL UPDATE utasítással használja cursor.execute az adatok frissítését.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Adatok törlése

Az alábbi példakód az SQL DELETE utasítással fut cursor.execute az adatok törléséhez.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

COPY parancs a gyors betöltéshez

A COPY parancs hatalmas átviteli sebességet eredményezhet, miközben adatokat tölt be az Azure Cosmos DB for PostgreSQL-be. A COPY parancs betöltheti az adatokat fájlokban, vagy a memóriában lévő adatok mikro kötegeiből valós idejű betöltéshez.

MÁSOLÁS parancs fájlból származó adatok betöltéséhez

Az alábbi kód egy CSV-fájlból egy adatbázistáblába másolja az adatokat. A kódhoz a fájl pharmacies.csv szükséges.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

COPY parancs a memóriában lévő adatok betöltéséhez

Az alábbi kód a memóriában lévő adatokat egy táblába másolja.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Alkalmazás újrapróbálkozása az adatbázis-kérelmek hibáihoz

Előfordulhat, hogy az alkalmazás adatbázis-kérelmei sikertelenek. Az ilyen problémák különböző forgatókönyvek esetén fordulhatnak elő, például az alkalmazás és az adatbázis közötti hálózati hiba, helytelen jelszó stb. Egyes problémák átmenetiek lehetnek, és néhány másodperc és perc alatt megoldhatják magukat. Az átmeneti hibák elhárításához konfigurálhatja az újrapróbálkozási logikát az alkalmazásban.

Az újrapróbálkozási logika konfigurálása az alkalmazásban segít a végfelhasználói élmény javításában. Hibaforgatókönyvek esetén a felhasználók csupán egy kicsit tovább várnak, amíg az alkalmazás a kéréseket kiszolgálja ahelyett, hogy hibákat tapasztalnak.

Az alábbi példa bemutatja, hogyan implementálhatja az újrapróbálkozás logikáját az alkalmazásban. A mintakódrészlet 60 másodpercenként (legfeljebb ötször) kísérel meg egy adatbázis-kérést, amíg sikeres nem lesz. Az újrapróbálkozések száma és gyakorisága az alkalmazás igényei szerint konfigurálható.

Ebben a kódban cserélje le <a fürtöt> a fürt nevére és <jelszavára> a rendszergazdai jelszóra.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Következő lépések