Usare Python per connettersi ed eseguire comandi SQL in Azure Cosmos DB for PostgreSQL

SI APPLICA A: Azure Cosmos DB for PostgreSQL (basato sull'estensione di database Citus per PostgreSQL)

Questa guida di avvio rapido illustra come usare il codice Python per connettersi a un cluster e usare istruzioni SQL per creare una tabella. Si inseriranno dati su cui eseguire query per poi aggiornarli ed eliminarli nel database. I passaggi descritti in questo articolo presuppongono che si abbia familiarità con lo sviluppo Python e non si abbia familiarità con Azure Cosmos DB for PostgreSQL.

Installare la libreria PostgreSQL

Gli esempi di codice in questo articolo richiedono la libreria psycopg2. È necessario installare psycopg2 con gestione pacchetti di linguaggio, ad esempio pip.

Connettersi, creare una tabella e inserire dati

L'esempio di codice seguente crea un pool di connessioni al database Postgres. Usa quindi le funzioni cursor.execute con istruzioni CREATE TABLE e INSERT INTO di SQL per creare una tabella e inserire dati.

Suggerimento

Il codice di esempio seguente usa un pool di connessioni per creare e gestire le connessioni a PostgreSQL. Il pool di connessioni sul lato applicazione è fortemente consigliato perché:

  • Garantisce che l'applicazione non generi troppe connessioni al database e quindi eviti di superare i limiti di connessione.
  • Può contribuire a migliorare drasticamente le prestazioni, sia in termini di latenza che di velocità effettiva. Il processo del server PostgreSQL deve creare una copia tramite fork per gestire ogni nuova connessione e il riutilizzo di una connessione evita tale sovraccarico.

Nel codice seguente sostituire <cluster> con il nome del cluster e <password> con la password amministratore.

Nota

Questo esempio chiude la connessione al termine, quindi se si desidera eseguire gli altri esempi nell'articolo nella stessa sessione, non includere la sezione # Clean up quando si esegue questo esempio.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Quando il codice viene eseguito in modo corretto, genera l'output seguente:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuire le tabelle

Azure Cosmos DB for PostgreSQL offre la super potenza di distribuzione delle tabelle tra più nodi per la scalabilità. Il comando seguente consente di distribuire una tabella. Per altre informazioni su create_distributed_table e sulla colonna di distribuzione, vedere qui.

Nota

La distribuzione delle tabelle consente di aumentare le dimensioni in tutti i nodi di lavoro aggiunti al cluster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Leggere i dati

L'esempio di codice seguente usa le API seguenti per leggere i dati dal database:

  • la funzione cursor.execute con l'istruzione SELECT di SQL per leggere i dati.
  • cursor.fetchall() per accettare una query e restituire un set di risultati per eseguire l'iterazione.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Aggiornamento dei dati

L'esempio di codice seguente usa cursor.execute con l'istruzione UPDATE di SQL per aggiornare i dati.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Eliminare dati

L'esempio di codice seguente esegue cursor.execute con l'istruzione DELETE di SQL per eliminare i dati.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Comando COPY per l'inserimento rapido

Il comando COPY può produrre una velocità effettiva elevata durante l'inserimento di dati in Azure Cosmos DB for PostgreSQL. Il comando COPY può inserire dati in file o da micro batch di dati in memoria per l'inserimento in tempo reale.

Comando COPY per caricare dati da un file

Il codice seguente copia i dati da un file CSV a una tabella del database. Il codice richiede il file pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Comando COPY per caricare i dati in memoria

Il codice seguente copia i dati in memoria in una tabella.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Retry dell'app per gli errori delle richieste di database

A volte è possibile che le richieste di database eseguite dall'applicazione non riescano. Tali problemi possono verificarsi in scenari diversi, ad esempio errori di rete tra app e database, password non corretta e così via. Alcuni problemi possono essere temporanei e risolversi in pochi secondi o pochi minuti. È possibile configurare la logica di retry nell'app per risolvere gli errori temporanei.

La configurazione della logica di retry nell'app consente di migliorare l'esperienza dell'utente finale. In scenari di errore gli utenti attenderanno semplicemente un po' più a lungo per consentire all'applicazione di gestire le richieste, anziché riscontrare errori.

L'esempio seguente illustra come implementare la logica di retry nell'app. Il frammento di codice di esempio tenta una richiesta di database ogni 60 secondi (fino a cinque volte) fino a quando non riesce. Il numero e la frequenza dei tentativi possono essere configurati in base alle esigenze dell'applicazione.

In questo codice sostituire <cluster> con il nome del cluster e <password> con la password amministratore.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Passaggi successivi