Verwenden von Python zum Herstellen einer Verbindung und zum Ausführen von SQL-Befehlen in Azure Cosmos DB for PostgreSQL

GILT FÜR: Azure Cosmos DB for PostgreSQL (unterstützt von der Citus-Datenbankerweiterung auf PostgreSQL)

In dieser Schnellstartanleitung erfahren Sie, wie Sie mithilfe von Python-Code eine Verbindung mit einem Cluster herstellen und SQL-Anweisungen zum Erstellen einer Tabelle verwenden. Anschließend werden Sie Daten in die Datenbank einfügen, abfragen, aktualisieren und löschen. Für die Anleitung in diesem Artikel benötigen Sie Kenntnisse in der Entwicklung in Python. Es ist jedoch keine Erfahrung im Umgang mit Azure Cosmos DB for PostgreSQL erforderlich.

Installieren der PostgreSQL-Bibliothek

Für die Codebeispiele in diesem Artikel ist die psycopg2-Bibliothek erforderlich. Sie müssen psycopg2 mit Ihrem Sprachpaket-Manager (z. B. pip) installieren.

Herstellen einer Verbindung, Erstellen einer Tabelle und Einfügen von Daten

Das folgende Codebeispiel erstellt einen Verbindungspool zu Ihrer Postgres-Datenbank. Anschließend werden cursor.execute-Funktionen mit den SQL-Anweisungen CREATE TABLE und INSERT INTO verwendet, um eine Tabelle zu erstellen und Daten einzufügen.

Tipp

Der folgende Beispielcode verwendet einen Verbindungspool, um Verbindungen zu PostgreSQL zu erstellen und zu verwalten. Anwendungsseitiges Verbindungspooling wird dringend empfohlen, weil:

  • Es stellt sicher, dass die Anwendung nicht zu viele Verbindungen zur Datenbank herstellt und somit ein Überschreiten der Verbindungslimits vermieden wird.
  • Sie kann dazu beitragen, die Leistung drastisch zu verbessern - sowohl die Latenzzeit als auch den Durchsatz. Der PostgreSQL-Serverprozess muss für jede neue Verbindung einen Fork ausführen, und die Wiederverwendung einer Verbindung vermeidet diesen Overhead.

Ersetzen Sie im folgenden Code <cluster> durch den Namen Ihres Clusters und <password> durch Ihr Administratorkennwort.

Hinweis

In diesem Beispiel wird die Verbindung am Ende geschlossen. Wenn Sie also die anderen Stichproben im Artikel in derselben Sitzung ausführen möchten, sollten Sie den Abschnitt # Clean up beim Ausführen dieser Stichprobe nicht einschließen.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Nach erfolgreicher Ausführung generiert der Code die folgende Ausgabe:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Verteilen von Tabellen

Mit Azure Cosmos DB for PostgreSQL können Sie Tabellen knotenübergreifend verteilen, um Skalierbarkeit zu ermöglichen. Mit dem folgenden Befehl können Sie eine Tabelle verteilen. Weitere Informationen zu create_distributed_table und zur Verteilungsspalte finden Sie hier.

Hinweis

Verteilte Tabellen werden auf alle Workerknoten platziert, die dem Cluster hinzugefügt werden.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Lesen von Daten

Im folgenden Codebeispiel werden die folgenden APIs zum Lesen von Daten aus der Datenbank verwendet:

  • cursor.execute mit der SQL-Anweisung SELECT zum Lesen der Daten
  • cursor.fetchall() akzeptiert eine Abfrage und gibt ein Resultset zurück, das durchlaufen werden kann.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Aktualisieren von Daten

Im folgenden Codebeispiel wird cursor.execute mit der SQL-Anweisung UPDATE zum Aktualisieren der Daten verwendet.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Löschen von Daten

Im folgenden Codebeispiel wird cursor.execute mit der DELETE-SQL-Anweisung zum Löschen der Daten verwendet.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

COPY-Befehl für eine schnelle Datenerfassung

Der COPY-Befehl kann bei der Erfassung von Daten in Azure Cosmos DB for PostgreSQL einen enormen Durchsatz erzielen. Der COPY-Befehl kann Daten in Dateien erfassen oder Mikrobatches von Daten im Arbeitsspeicher in Echtzeit erfassen.

COPY-Befehl zum Laden von Daten aus einer Datei

Mit dem folgenden Code werden Daten aus einer CSV-Datei in eine Datenbanktabelle kopiert. Für den Code ist die Datei pharmacies.csv erforderlich.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

COPY-Befehl zum Laden von Daten im Arbeitsspeicher

Mit dem folgenden Code werden speicherinterne Daten in eine Tabelle kopiert.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

App-Wiederholung bei Datenbankanforderungsfehlern

Manchmal passiert es, dass Datenbankanforderungen an Ihre Anwendung fehlschlagen. Solche Probleme können in verschiedenen Szenarien auftreten, z. B. bei Netzwerkfehlern zwischen App und Datenbank, bei einem falschen Kennwort usw. Manche Probleme sind temporär und lassen sich in wenigen Sekunden bis Minuten lösen. Sie können die Retry-Logik in Ihrer App konfigurieren, um die vorübergehenden Fehler zu bewältigen.

Durch das Konfigurieren der Retry-Logik in Ihrer App lässt sich die Benutzerfreundlichkeit für den Endbenutzers verbessern. In einem Fehlerszenario wartet der Benutzer lediglich etwas länger, bis die Anwendung Anforderungen ausgibt und keine Fehler mehr auftreten.

Im nachfolgenden Beispiel sehen Sie, wie Sie die Retry-Logik in Ihrer App implementieren. Der Beispielcodeschnipsel versucht es alle 60 Sekunden mit einer Datenbankanforderung (bis zu fünf Mal), bis er erfolgreich ist. Die Anzahl und Häufigkeit der Retries kann basierend auf den Anforderungen Ihrer Anwendung konfiguriert werden.

Ersetzen Sie in diesem Code <cluster> durch den Namen Ihres Clusters und <password> durch Ihr Administratorkennwort.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Nächste Schritte