SQL-parancsok csatlakoztatása és futtatása a Python használatával az Azure Cosmos DB for PostgreSQL-ben

A KÖVETKEZŐKRE VONATKOZIK: Azure Cosmos DB for PostgreSQL (a Citus-adatbázis PostgreSQL-hez való kiterjesztése)

Ez a rövid útmutató bemutatja, hogyan csatlakozhat Python-kóddal egy fürthöz, és hogyan hozhat létre táblát SQL-utasítások használatával. Ezután adatokat szúr be, kérdez le, frissít és töröl az adatbázisban. A cikkben szereplő lépések feltételezik, hogy ismeri a Python-fejlesztést, és még nem ismeri a PostgreSQL-hez készült Azure Cosmos DB használatát.

PostgreSQL-kódtár telepítése

A cikkben szereplő kód példákhoz a psycopg2 kódtárra van szükség. Telepítenie kell a psycopg2-t a nyelvi csomagkezelővel (például pip).

Csatlakozás, tábla létrehozása és adatok beszúrása

Az alábbi példakód létrehoz egy kapcsolatkészletet a Postgres-adatbázishoz. Ezután a cursor.execute függvényeket használja az SQL CREATE TABLE és az INSERT INTO utasítások használatával egy tábla létrehozásához és adatok beszúrásához.

Tipp

Az alábbi mintakód egy kapcsolatkészletet használ a PostgreSQL-kapcsolatok létrehozásához és kezeléséhez. Az alkalmazásoldali kapcsolatkészletezés erősen ajánlott, mert:

  • Biztosítja, hogy az alkalmazás ne hozzon létre túl sok kapcsolatot az adatbázissal, így elkerülhető a kapcsolati korlátok túllépése.
  • Ez segíthet a teljesítmény drasztikus javításában – a késés és az átviteli sebesség szempontjából is. A PostgreSQL-kiszolgáló folyamatának elágaztatást kell végeznie az egyes új kapcsolatok kezeléséhez, és a kapcsolatok újrafelhasználása elkerüli ezt a többletterhelést.

Az alábbi kódban cserélje le <a fürtöt> a fürt nevére és <jelszavára> a rendszergazdai jelszóra.

Megjegyzés

Ez a példa lezárja a kapcsolatot a végén, így ha a cikkben szereplő többi mintát ugyanabban a munkamenetben szeretné futtatni, ne foglalja bele a szakaszt a # Clean up minta futtatásakor.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Ha a kód sikeresen fut, a következő kimenetet hozza létre:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Táblák elosztása

Az Azure Cosmos DB for PostgreSQL a táblák több csomópont közötti elosztásának szuperképességét biztosítja a méretezhetőség érdekében. Az alábbi parancs lehetővé teszi egy tábla terjesztését. Erről és a terjesztési oszlopról create_distributed_tableitt tudhat meg többet.

Megjegyzés

A táblák elosztása lehetővé teszi számukra, hogy a fürthöz hozzáadott munkavégző csomópontok között növekedjenek.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Adatok olvasása

Az alábbi példakód a következő API-kat használja az adatbázisból származó adatok beolvasásához:

  • cursor.execute az SQL SELECT utasítással az adatok olvasásához.
  • cursor.fetchall() egy lekérdezés elfogadásához és egy iterátumra beállított eredményhalmaz visszaadásához.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Adatok frissítése

Az alábbi példakód az SQL UPDATE utasítással frissíti cursor.execute az adatokat.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Adat törlése

Az alábbi példakód az SQL DELETE utasítással fut cursor.execute az adatok törléséhez.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

COPY parancs a gyors betöltéshez

A COPY parancs hatalmas átviteli sebességet eredményezhet, miközben adatokat betölt az Azure Cosmos DB for PostgreSQL-be. A COPY parancs betöltheti az adatokat fájlokba, vagy a memóriában lévő adatok mikro kötegeiből valós idejű betöltéshez.

COPY (MÁSOLÁS) parancs az adatok fájlból való betöltéséhez

Az alábbi kód egy CSV-fájlból egy adatbázistáblába másolja az adatokat. A kódhoz a fájlpharmacies.csvszükséges .

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

COPY parancs a memóriában lévő adatok betöltéséhez

Az alábbi kód a memórián belüli adatokat egy táblába másolja.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Alkalmazás újrapróbálkozása az adatbázis-kérelmek hibáihoz

Előfordulhat, hogy az alkalmazás adatbázis-kérelmei meghiúsulnak. Az ilyen problémák különböző forgatókönyvek esetén fordulhatnak elő, például az alkalmazás és az adatbázis közötti hálózati hiba, a helytelen jelszó stb. Egyes problémák átmenetiek lehetnek, és néhány másodperc és perc alatt megoldhatják magukat. Az átmeneti hibák elhárításához konfigurálhatja az újrapróbálkozási logikát az alkalmazásban.

Az újrapróbálkozási logika konfigurálása az alkalmazásban segít a végfelhasználói élmény javításában. Hibaforgatókönyvek esetén a felhasználók csak egy kicsit tovább várnak, amíg az alkalmazás kiszolgálja a kéréseket ahelyett, hogy hibákat tapasztalnak.

Az alábbi példa bemutatja, hogyan implementálhat újrapróbálkozési logikát az alkalmazásban. A mintakódrészlet 60 másodpercenként (legfeljebb öt alkalommal) kísérel meg adatbázis-kérést, amíg sikeres nem lesz. Az újrapróbálkozések száma és gyakorisága az alkalmazás igényei szerint konfigurálható.

Ebben a kódban cserélje le <a fürtöt> a fürt nevére és <jelszavára> a rendszergazdai jelszóra.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Következő lépések