Dela via


Använda Python för att ansluta och köra SQL-kommandon i Azure Cosmos DB för PostgreSQL

GÄLLER FÖR: Azure Cosmos DB for PostgreSQL (drivs av Citus-databastillägget till PostgreSQL)

Den här snabbstarten visar hur du använder Python-kod för att ansluta till ett kluster och använder SQL-instruktioner för att skapa en tabell. Sedan infogar, frågar, uppdaterar och tar du bort data i databasen. Stegen i den här artikeln förutsätter att du är bekant med Python-utveckling och är nybörjare på att arbeta med Azure Cosmos DB för PostgreSQL.

Installera PostgreSQL-biblioteket

Kodexemplen i den här artikeln kräver biblioteket psycopg2 . Du måste installera psycopg2 med språkpakethanteraren (till exempel pip).

Ansluta, skapa en tabell och infoga data

I följande kodexempel skapas en anslutningspool till Postgres-databasen. Den använder sedan cursor.execute-funktioner med SQL CREATE TABLE- och INSERT INTO-instruktioner för att skapa en tabell och infoga data.

Dricks

Exempelkoden nedan använder en anslutningspool för att skapa och hantera anslutningar till PostgreSQL. Anslutningspooler på programsidan rekommenderas starkt eftersom:

  • Det säkerställer att programmet inte genererar för många anslutningar till databasen och undviker därför att överskrida anslutningsgränserna.
  • Det kan hjälpa dig att drastiskt förbättra prestanda – både svarstid och dataflöde. PostgreSQL-serverprocessen måste förgrenas för att hantera varje ny anslutning, och återanvändning av en anslutning undviker detta.

I följande kod ersätter du <klustret> med ditt klusternamn och <lösenord> med administratörslösenordet eller Microsoft Entra-ID-token.

Kommentar

Det här exemplet stänger anslutningen i slutet, så om du vill köra de andra exemplen i artikeln i samma session ska du inte ta med # Clean up avsnittet när du kör det här exemplet.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

När koden körs genererar den följande utdata:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuera tabeller

Azure Cosmos DB for PostgreSQL ger dig superkraften att distribuera tabeller över flera noder för skalbarhet. Med kommandot nedan kan du distribuera en tabell. Du kan läsa mer om create_distributed_table och distributionskolumnen här.

Kommentar

Genom att distribuera tabeller kan de växa över alla arbetsnoder som läggs till i klustret.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Läsa data

I följande kodexempel används följande API:er för att läsa data från databasen:

  • cursor.execute med SQL SELECT-instruktionen för att läsa data.
  • cursor.fetchall() för att acceptera en fråga och returnera ett resultat inställt på iterate.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Uppdatera data

I följande kodexempel används cursor.execute med SQL UPDATE-instruktionen för att uppdatera data.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Ta bort data

Följande kodexempel körs cursor.execute med SQL DELETE-instruktionen för att ta bort data.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

COPY-kommando för snabb inmatning

COPY-kommandot kan ge ett enormt dataflöde när data matas in i Azure Cosmos DB for PostgreSQL. Kommandot COPY kan mata in data i filer eller från mikrobats med data i minnet för inmatning i realtid.

COPY-kommando för att läsa in data från en fil

Följande kod kopierar data från en CSV-fil till en databastabell. Koden kräver filen pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

COPY-kommandot för att läsa in minnesintern data

Följande kod kopierar minnesintern data till en tabell.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Försök igen för fel med databasbegäran

Det är ibland möjligt att databasbegäranden från ditt program misslyckas. Sådana problem kan inträffa i olika scenarier, till exempel nätverksfel mellan app och databas, felaktigt lösenord osv. Vissa problem kan vara tillfälliga och lösa sig själva inom några sekunder till minuter. Du kan konfigurera omprövningslogik i appen för att lösa de tillfälliga felen.

Om du konfigurerar omförsökslogik i din app kan du förbättra slutanvändarupplevelsen. Under felscenarier väntar användarna bara lite längre på att programmet ska hantera begäranden i stället för att uppleva fel.

Exemplet nedan visar hur du implementerar logik för återförsök i din app. Exempelkodfragmentet försöker en databasbegäran var 60:e sekund (upp till fem gånger) tills den lyckas. Antalet och frekvensen för återförsök kan konfigureras baserat på programmets behov.

I den här koden ersätter du <klustret> med ditt klusternamn och <lösenord> med administratörslösenordet.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Nästa steg