Python gebruiken om verbinding te maken en SQL-opdrachten uit te voeren in Azure Cosmos DB for PostgreSQL

VAN TOEPASSING OP: Azure Cosmos DB for PostgreSQL (mogelijk gemaakt door de Citus-database-extensie voor PostgreSQL)

In deze quickstart ziet u hoe u Python-code gebruikt om verbinding te maken met een cluster en hoe u SQL-instructies gebruikt om een tabel te maken. Vervolgens gaat u gegevens in de database invoegen, opvragen, bijwerken en verwijderen. Bij de stappen in dit artikel wordt ervan uitgegaan dat u bekend bent met Python-ontwikkeling en dat u geen ervaring hebt met het werken met Azure Cosmos DB for PostgreSQL.

PostgreSQL-bibliotheek installeren

Voor de codevoorbeelden in dit artikel is de psycopg2-bibliotheek vereist. U moet psycopg2 installeren met uw taalpakketbeheer (zoals pip).

Verbinding maken, een tabel maken en gegevens invoegen

In het volgende codevoorbeeld wordt een verbindingsgroep met uw Postgres-database gemaakt. Vervolgens worden de functies cursor.execute met SQL CREATE TABLE en INSERT INTO-instructies gebruikt om een tabel te maken en gegevens in te voegen.

Tip

In de onderstaande voorbeeldcode wordt een verbindingsgroep gebruikt om verbindingen met PostgreSQL te maken en te beheren. Groepsgewijze verbindingen aan de toepassingszijde wordt sterk aanbevolen omdat:

  • Het zorgt ervoor dat de toepassing niet te veel verbindingen met de database genereert en zo wordt voorkomen dat de verbindingslimieten worden overschreden.
  • Het kan helpen de prestaties drastisch te verbeteren, zowel latentie als doorvoer. Het PostgreSQL-serverproces moet fork zijn om elke nieuwe verbinding te verwerken. Als u een verbinding opnieuw gebruikt, wordt deze overhead vermeden.

Vervang <in de volgende code het cluster> door uw clusternaam en <wachtwoord> door uw beheerderswachtwoord.

Notitie

In dit voorbeeld wordt de verbinding aan het einde gesloten, dus als u de andere voorbeelden in het artikel in dezelfde sessie wilt uitvoeren, moet u de # Clean up sectie niet opnemen wanneer u dit voorbeeld uitvoert.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Wanneer de code wordt uitgevoerd, wordt de volgende uitvoer gegenereerd:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Tabellen distribueren

Azure Cosmos DB for PostgreSQL biedt u de superkracht van het distribueren van tabellen over meerdere knooppunten voor schaalbaarheid. Met de onderstaande opdracht kunt u een tabel distribueren. Meer informatie over create_distributed_table en de distributiekolom vindt u hier.

Notitie

Als u tabellen distribueert, kunnen deze worden verdeeld over alle werkknooppunten die aan het cluster zijn toegevoegd.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Gegevens lezen

In het volgende codevoorbeeld worden de volgende API's gebruikt om gegevens uit de database te lezen:

  • cursor.execute met de SQL SELECT-instructie om gegevens te lezen.
  • cursor.fetchall() om een query te accepteren en een resultatenset te retourneren die moet worden herhaald.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Gegevens bijwerken

In het volgende codevoorbeeld wordt gebruikgemaakt cursor.execute van de SQL UPDATE-instructie om gegevens bij te werken.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Gegevens verwijderen

Het volgende codevoorbeeld wordt uitgevoerd cursor.execute met de SQL DELETE-instructie om de gegevens te verwijderen.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

COPY-opdracht voor snelle opname

De opdracht COPY kan een enorme doorvoer opleveren tijdens het opnemen van gegevens in Azure Cosmos DB for PostgreSQL. Met de opdracht COPY kunnen gegevens worden opgenomen in bestanden of uit microbatches met gegevens in het geheugen voor realtime-opname.

COPY-opdracht voor het laden van gegevens uit een bestand

Met de volgende code worden gegevens uit een CSV-bestand gekopieerd naar een databasetabel. Voor de code is het bestand pharmacies.csvvereist.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

COPY-opdracht voor het laden van gegevens in het geheugen

Met de volgende code worden gegevens in het geheugen gekopieerd naar een tabel.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

App opnieuw proberen voor databaseaanvraagfouten

Het is soms mogelijk dat databaseaanvragen van uw toepassing mislukken. Dergelijke problemen kunnen zich voordoen in verschillende scenario's, zoals netwerkfouten tussen app en database, onjuist wachtwoord, enzovoort. Sommige problemen kunnen tijdelijk zijn en zichzelf binnen enkele seconden tot minuten oplossen. U kunt logica voor opnieuw proberen in uw app configureren om de tijdelijke fouten te verhelpen.

Het configureren van logica voor opnieuw proberen in uw app helpt de eindgebruikerservaring te verbeteren. Bij foutscenario's wachten gebruikers slechts iets langer totdat de toepassing aanvragen verwerkt, in plaats van fouten te ervaren.

In het onderstaande voorbeeld ziet u hoe u logica voor opnieuw proberen implementeert in uw app. Het voorbeeldcodefragment probeert elke 60 seconden (maximaal vijf keer) een databaseaanvraag uit te voeren totdat dit lukt. Het aantal en de frequentie van nieuwe pogingen kunnen worden geconfigureerd op basis van de behoeften van uw toepassing.

Vervang <in deze code het cluster> door uw clusternaam en <wachtwoord> door uw beheerderswachtwoord.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Volgende stappen