Utiliser Python pour connecter et exécuter des commandes SQL sur Azure Cosmos DB for PostgreSQL

S’APPLIQUE À : Azure Cosmos DB for PostgreSQL (avec l’extension de base de données Citus pour PostgreSQL)

Ce guide de démarrage rapide vous montre comment utiliser du code Python pour vous connecter à un cluster et utiliser des instructions SQL pour créer une table. Ensuite, vous insérerez, interrogerez, mettrez à jour et supprimerez des données dans la base de données. Cet article suppose que vous connaissez les bases du développement Python, et que vous ne savez pas utiliser Azure Cosmos DB for PostgreSQL.

Installer la bibliothèque PostgreSQL

Les exemples de code de cet article nécessitent la bibliothèque psycopg2. Vous devez installer psycopg2 avec votre gestionnaire de package de langage (par exemple pip).

Se connecter, créer une table et insérer des données

L’exemple de code suivant crée un pool de connexions pour votre base de données Postgres. Il utilise ensuite des fonctions cursor.execute avec des instructions SQL CREATE TABLE et INSERT INTO pour créer une table et insérer des données.

Conseil

L’exemple de code ci-dessous utilise un pool de connexions pour créer et gérer des connexions à PostgreSQL. Le regroupement de connexions côté application est fortement recommandé, car :

  • Elle garantit que l’application ne génère pas trop de connexions à la base de données, et évite ainsi de dépasser les limites de connexion.
  • Cela peut contribuer à améliorer considérablement les performances, à la fois la latence et le débit. Le processus serveur PostgreSQL doit être dupliqué pour gérer chaque nouvelle connexion et réutiliser une connexion, ce qui évite cette surcharge.

Dans le code suivant, remplacez <cluster> par le nom de votre cluster et <mot de passe> par votre mot de passe d’administration.

Notes

Cet exemple fermant la connexion à la fin, si vous souhaitez exécuter les autres exemples de l’article dans la même session, n’incluez pas la section # Clean up lors de l’exécution de cet exemple.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Lorsque le code est exécuté correctement, il produit le résultat suivant :

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuer des tables

Azure Cosmos DB for PostgreSQL vous offre la super puissance de distribution de tables sur plusieurs nœuds à des fins de scalabilité. La commande ci-dessous vous permet de distribuer une table. Vous pouvez en savoir plus sur create_distributed_table et la colonne de distribution ici.

Notes

La distribution de tables leur permet de croître sur tous les nœuds Worker ajoutés au cluster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Lire les données

L’exemple de code suivant utilise les API suivantes pour lire les données de la base de données :

  • cursor.execute avec l’instruction SQL SELECT pour lire les données.
  • cursor.fetchall() pour accepter une requête et retourner un jeu de résultats pour l’itération.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Mettre à jour des données

L’exemple de code suivant utilise cursor.execute avec l’instruction SQL UPDATE pour mettre à jour les données.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Suppression de données

L’exemple de code suivant exécute cursor.execute avec l’instruction SQL DELETE pour supprimer les données.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Commande COPY pour l’ingestion rapide

La commande COPY peut générer un débit considérable lors de l’ingestion de données dans Azure Cosmos DB for PostgreSQL. La commande COPY peut ingérer des données dans des fichiers ou à partir de micro-lots de données en mémoire pour l’ingestion en temps réel.

Commande COPY pour charger des données à partir d’un fichier

Le code suivant copie des données à partir d’un fichier CSV dans une table de base de données. Le code requiert le fichier pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Commande COPY pour charger des données en mémoire

Le code suivant copie des données en mémoire dans une table.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Nouvelle tentative d’application pour les échecs de requête de base de données

Il arrive parfois que les requêtes de base de données en provenance de votre application échouent. Ces problèmes peuvent se produire dans différents scénarios, comme une défaillance réseau entre une application et une base de données, un mot de passe incorrect, etc. Certains problèmes peuvent être temporaires et se résoudre d’eux-mêmes en quelques secondes ou quelques minutes. Vous pouvez configurer la logique de nouvelle tentative dans votre application pour surmonter les erreurs temporaires.

La configuration de la logique de nouvelle tentative dans votre application permet d’améliorer l’expérience de l’utilisateur final. Dans les scénarios de défaillance, les utilisateurs attendent simplement un peu plus longtemps que l’application traite les requêtes, au lieu d’être confrontés à des erreurs.

L’exemple ci-dessous montre comment implémenter une logique de nouvelle tentative dans votre application. L’exemple d’extrait de code tente une requête de base de données toutes les 60 secondes (jusqu’à cinq fois) jusqu’à ce qu’elle réussisse. Le nombre et la fréquence des nouvelles tentatives peuvent être configurés en fonction des besoins de votre application.

Dans ce code, remplacez <cluster> par le nom de votre cluster et <mot de passe> par votre mot de passe d’administration.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Étapes suivantes