Delen via


Python gebruiken om SQL-opdrachten te verbinden en uit te voeren in Azure Cosmos DB for PostgreSQL

VAN TOEPASSING OP: Azure Cosmos DB for PostgreSQL (mogelijk gemaakt door de Citus-database-extensie naar PostgreSQL)

In deze quickstart ziet u hoe u Python-code gebruikt om verbinding te maken met een cluster en SQL-instructies gebruikt om een tabel te maken. Vervolgens voegt u gegevens in de database in, voert u query's uit, werkt u deze bij en verwijdert u deze. In de stappen in dit artikel wordt ervan uitgegaan dat u bekend bent met de ontwikkeling van Python en geen ervaring hebt met het werken met Azure Cosmos DB voor PostgreSQL.

PostgreSQL-bibliotheek installeren

Voor de codevoorbeelden in dit artikel is de psycopg2-bibliotheek vereist. U moet psycopg2 installeren met uw taalpakketbeheer (zoals pip).

Verbinding maken, een tabel maken en gegevens invoegen

In het volgende codevoorbeeld wordt een verbindingsgroep met uw Postgres-database gemaakt. Vervolgens worden cursor.execute-functies gebruikt met SQL CREATE TABLE- en INSERT INTO-instructies om een tabel te maken en gegevens in te voegen.

Tip

In de onderstaande voorbeeldcode wordt een verbindingsgroep gebruikt om verbindingen met PostgreSQL te maken en te beheren. Groepsgewijze verbindingen aan de toepassingszijde wordt sterk aanbevolen omdat:

  • Het zorgt ervoor dat de toepassing niet te veel verbindingen met de database genereert en dus geen verbindingslimieten overschrijdt.
  • Het kan helpen de prestaties drastisch te verbeteren, zowel latentie als doorvoer. Het PostgreSQL-serverproces moet fork gebruiken om elke nieuwe verbinding te verwerken en het hergebruik van een verbinding voorkomt die overhead.

Vervang in de volgende code het cluster> door <de clusternaam en <het wachtwoord> door het beheerderswachtwoord.

Notitie

In dit voorbeeld wordt de verbinding aan het einde gesloten, dus als u de andere voorbeelden in het artikel in dezelfde sessie wilt uitvoeren, neemt u de # Clean up sectie niet op wanneer u dit voorbeeld uitvoert.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Wanneer de code wordt uitgevoerd, wordt de volgende uitvoer gegenereerd:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Tabellen distribueren

Azure Cosmos DB for PostgreSQL biedt u de superkracht van het distribueren van tabellen over meerdere knooppunten voor schaalbaarheid. Met de onderstaande opdracht kunt u een tabel distribueren. Hier vindt u meer informatie over create_distributed_table en de distributiekolom.

Notitie

Als u tabellen distribueert, kunnen ze groeien over alle werkknooppunten die aan het cluster zijn toegevoegd.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Gegevens lezen

In het volgende codevoorbeeld worden de volgende API's gebruikt om gegevens uit de database te lezen:

  • cursor.execute with the SQL SELECT statement to read data.
  • cursor.fetchall() om een query te accepteren en een resultatenset te retourneren om deze te herhalen.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Gegevens bijwerken

In het volgende codevoorbeeld wordt de SQL UPDATE-instructie gebruikt cursor.execute om gegevens bij te werken.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Gegevens verwijderen

Het volgende codevoorbeeld wordt uitgevoerd cursor.execute met de SQL DELETE-instructie om de gegevens te verwijderen.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

COPY-opdracht voor snelle opname

De opdracht COPY kan een enorme doorvoer opleveren tijdens het opnemen van gegevens in Azure Cosmos DB for PostgreSQL. Met de opdracht COPY kunt u gegevens opnemen in bestanden of uit microbatches met gegevens in het geheugen voor realtime opname.

De opdracht COPY om gegevens uit een bestand te laden

Met de volgende code worden gegevens uit een CSV-bestand gekopieerd naar een databasetabel. Voor de code is het bestand pharmacies.csv vereist.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

COPY-opdracht voor het laden van in-memory gegevens

Met de volgende code worden gegevens in het geheugen gekopieerd naar een tabel.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

App-nieuwe poging voor mislukte databaseaanvragen

Het is soms mogelijk dat databaseaanvragen van uw toepassing mislukken. Dergelijke problemen kunnen zich voordoen in verschillende scenario's, zoals netwerkfouten tussen app en database, onjuist wachtwoord, enzovoort. Sommige problemen kunnen tijdelijk zijn en zichzelf binnen enkele seconden tot minuten oplossen. U kunt logica voor opnieuw proberen in uw app configureren om de tijdelijke fouten op te lossen.

Het configureren van logica voor opnieuw proberen in uw app helpt de eindgebruikerservaring te verbeteren. In scenario's met fouten wachten gebruikers slechts wat langer totdat de toepassing aanvragen verwerkt, in plaats van fouten te ervaren.

In het onderstaande voorbeeld ziet u hoe u logica voor opnieuw proberen implementeert in uw app. Het voorbeeldcodefragment probeert elke 60 seconden (maximaal vijf keer) een databaseaanvraag uit te voeren totdat dit lukt. Het aantal en de frequentie van nieuwe pogingen kunnen worden geconfigureerd op basis van de behoeften van uw toepassing.

Vervang in deze code het cluster> door <uw clusternaam en <wachtwoord> door het beheerderswachtwoord.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Volgende stappen