Usare Python per connettersi ed eseguire comandi SQL in Azure Cosmos DB per PostgreSQL

SI APPLICA A: Postgresql

Questa guida introduttiva illustra come usare il codice Python per connettersi a un cluster e usare istruzioni SQL per creare una tabella. Si inseriranno, eseguiranno query, aggiorneranno ed elimineranno i dati nel database. I passaggi descritti in questo articolo presuppongono che si abbia familiarità con lo sviluppo python e che non si abbia familiarità con Azure Cosmos DB per PostgreSQL.

Installare la libreria PostgreSQL

Gli esempi di codice in questo articolo richiedono la libreria psycopg2 . È necessario installare psycopg2 con lo strumento di gestione pacchetti della lingua (ad esempio pip).

Connettersi, creare una tabella e inserire dati

L'esempio di codice seguente crea un pool di connessioni al database Postgres. Usa quindi le funzioni cursor.execute con istruzioni SQL CREATE TABLE e INSERT INTO per creare una tabella e inserire dati.

Suggerimento

Il codice di esempio seguente usa un pool di connessioni per creare e gestire le connessioni a PostgreSQL. Il pool di connessioni lato applicazione è fortemente consigliato perché:

  • Garantisce che l'applicazione non generi troppe connessioni al database e quindi evita di superare i limiti di connessione.
  • Può contribuire a migliorare drasticamente la latenza e la velocità effettiva. Il processo del server PostgreSQL deve creare una copia tramite fork per gestire ogni nuova connessione e riutilizzare una connessione evita tale sovraccarico.

Nel codice seguente sostituire <il cluster> con il nome e <la password> del cluster con la password dell'amministratore.

Nota

Questo esempio chiude la connessione alla fine, quindi se si vogliono eseguire gli altri esempi nell'articolo nella stessa sessione, non includere la # Clean up sezione quando si esegue questo esempio.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c.<cluster>.postgres.database.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Quando il codice viene eseguito in modo corretto, genera l'output seguente:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuire le tabelle

Azure Cosmos DB per PostgreSQL offre la massima potenza di distribuzione delle tabelle tra più nodi per la scalabilità. Il comando seguente consente di distribuire una tabella. Altre informazioni e create_distributed_table la colonna di distribuzione sono disponibili qui.

Nota

La distribuzione delle tabelle consente di aumentare le dimensioni in tutti i nodi di lavoro aggiunti al cluster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Leggere i dati

Nell'esempio di codice seguente vengono usate le API seguenti per leggere i dati dal database:

  • la funzione cursor.execute con l'istruzione SELECT di SQL per leggere i dati.
  • cursor.fetchall() per accettare una query e restituire un set di risultati per eseguire l'iterazione.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Aggiornare i dati

Nell'esempio di codice seguente viene cursor.execute usato con l'istruzione SQL UPDATE per aggiornare i dati.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Eliminare i dati

L'esempio di codice seguente viene eseguito cursor.execute con l'istruzione SQL DELETE per eliminare i dati.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Comando COPY per l'inserimento rapido

Il comando COPY può produrre una velocità effettiva enorme durante l'inserimento dei dati in Azure Cosmos DB per PostgreSQL. Il comando COPY può inserire dati in file o da micro batch di dati in memoria per l'inserimento in tempo reale.

Comando COPY per caricare dati da un file

Il codice seguente copia i dati da un file CSV a una tabella di database. Il codice richiede il file pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Comando COPY per caricare dati in memoria

Il codice seguente copia i dati in memoria in una tabella.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Tentativo di ripetizione dell'app per gli errori di richiesta del database

A volte è possibile che le richieste di database dall'applicazione non riescano. Questi problemi possono verificarsi in scenari diversi, ad esempio errori di rete tra app e database, password non corretta e così via. Alcuni problemi possono essere temporanei e risolverli in pochi secondi o minuti. È possibile configurare la logica di ripetizione dei tentativi nell'app per risolvere gli errori temporanei.

La configurazione della logica di ripetizione dei tentativi nell'app consente di migliorare l'esperienza utente finale. In scenari di errore, gli utenti attenderanno semplicemente un po ' più a lungo per consentire all'applicazione di gestire le richieste, invece di riscontrare errori.

L'esempio seguente illustra come implementare la logica di ripetizione dei tentativi nell'app. Il frammento di codice di esempio tenta una richiesta di database ogni 60 secondi (fino a cinque volte) fino a quando non riesce. Il numero e la frequenza dei tentativi possono essere configurati in base alle esigenze dell'applicazione.

In questo codice sostituire <il cluster> con il nome e <la password> del cluster con la password dell'amministratore.

import psycopg2
import time
from psycopg2 import pool

host = "c.<cluster>.postgres.database.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Passaggi successivi