Sdílet prostřednictvím


Použití Pythonu k připojení a spuštění příkazů SQL ve službě Azure Cosmos DB for PostgreSQL

PLATÍ PRO: Azure Cosmos DB for PostgreSQL (využívající rozšíření databáze Citus do PostgreSQL)

V tomto rychlém startu se dozvíte, jak se pomocí kódu Pythonu připojit ke clusteru a pomocí příkazů SQL vytvořit tabulku. Potom vložíte data do databáze, budete je dotazovat, aktualizovat a odstraňovat. Kroky v tomto článku předpokládají, že znáte vývoj v Pythonu a začínáte pracovat se službou Azure Cosmos DB for PostgreSQL.

Instalace knihovny PostgreSQL

Příklady kódu v tomto článku vyžadují knihovnu psycopg2 . Budete muset nainstalovat psycopg2 se správcem jazykových balíčků (například pip).

Připojení, vytvoření tabulky a vložení dat

Následující příklad kódu vytvoří fond připojení k vaší databázi Postgres. Pak pomocí funkcí cursor.execute s příkazy SQL CREATE TABLE a INSERT INTO vytvoří tabulku a vloží data.

Tip

Následující ukázkový kód používá fond připojení k vytvoření a správě připojení k PostgreSQL. Sdružování připojení na straně aplikace se důrazně doporučuje, protože:

  • Zajišťuje, že aplikace nevygeneruje příliš mnoho připojení k databázi, a proto se vyhne překročení limitů připojení.
  • Může výrazně zlepšit výkon – latenci i propustnost. Proces serveru PostgreSQL musí vytvořit fork pro zpracování každého nového připojení a opětovným využitím připojení se této režii vyhnout.

V následujícím kódu nahraďte <cluster názvem clusteru> a <heslem> správce.

Poznámka:

Tento příklad připojení zavře na konci, takže pokud chcete spustit další ukázky v článku ve stejné relaci, nezahrnujte # Clean up část při spuštění této ukázky.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Když se kód úspěšně spustí, vytvoří následující výstup:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuce tabulek

Azure Cosmos DB for PostgreSQL poskytuje super výkon distribuce tabulek napříč několika uzly pro zajištění škálovatelnosti. Následující příkaz umožňuje distribuovat tabulku. Další informace o create_distributed_table distribučním sloupci najdete tady.

Poznámka:

Distribuce tabulek umožňuje růst mezi všechny pracovní uzly přidané do clusteru.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Čtení dat

Následující příklad kódu používá následující rozhraní API ke čtení dat z databáze:

# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Aktualizace dat

Následující příklad kódu používá cursor.execute s příkazem SQL UPDATE k aktualizaci dat.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Odstranění dat

Následující příklad kódu se spustí cursor.execute s příkazem SQL DELETE pro odstranění dat.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Příkaz COPY pro rychlý příjem dat

Příkaz COPY může přinést obrovskou propustnost při příjmu dat do služby Azure Cosmos DB for PostgreSQL. Příkaz COPY může ingestovat data v souborech nebo z mikrodávek dat v paměti pro příjem dat v reálném čase.

Příkaz COPY pro načtení dat ze souboru

Následující kód zkopíruje data ze souboru CSV do databázové tabulky. Kód vyžaduje soubor pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Příkaz COPY pro načtení dat v paměti

Následující kód zkopíruje data v paměti do tabulky.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Opakování aplikace kvůli selhání žádostí o databázi

Někdy je možné, že databázové požadavky z vaší aplikace selžou. K takovým problémům může dojít v různých scénářích, jako je selhání sítě mezi aplikací a databází, nesprávné heslo atd. Některé problémy můžou být přechodné a během několika sekund až minut se vyřeší. Logiku opakování v aplikaci můžete nakonfigurovat tak, aby se překončily přechodné chyby.

Konfigurace logiky opakování ve vaší aplikaci pomáhá zlepšit uživatelské prostředí. Ve scénářích selhání uživatelé budou jen chvíli čekat, než aplikace bude obsluhovat požadavky, místo aby došlo k chybám.

Následující příklad ukazuje, jak implementovat logiku opakování ve vaší aplikaci. Ukázkový fragment kódu se pokusí o požadavek databáze každých 60 sekund (až pětkrát), dokud nebude úspěšný. Počet a četnost opakování je možné nakonfigurovat na základě potřeb vaší aplikace.

V tomto kódu nahraďte <cluster> názvem clusteru a <heslem> správce.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Další kroky