Použití Pythonu k připojení a spouštění příkazů SQL ve službě Azure Cosmos DB for PostgreSQL

PLATÍ PRO: Azure Cosmos DB for PostgreSQL (využívá rozšíření databáze Citus na PostgreSQL)

V tomto rychlém startu se dozvíte, jak se pomocí kódu Pythonu připojit ke clusteru a pomocí příkazů SQL vytvořit tabulku. Potom v databázi vložíte data, budete je dotazovat, aktualizovat a odstranit. Kroky v tomto článku předpokládají, že máte zkušenosti s vývojem v Pythonu a začínáte pracovat se službou Azure Cosmos DB for PostgreSQL.

Instalace knihovny PostgreSQL

Příklady kódu v tomto článku vyžadují knihovnu psycopg2 . Budete muset nainstalovat psycopg2 pomocí správce jazykových balíčků (například pip).

Připojení, vytvoření tabulky a vložení dat

Následující příklad kódu vytvoří fond připojení k databázi Postgres. Pak pomocí funkcí cursor.execute s příkazy SQL CREATE TABLE a INSERT INTO vytvoří tabulku a vloží data.

Tip

Následující ukázkový kód používá fond připojení k vytvoření a správě připojení k PostgreSQL. Sdružování připojení na straně aplikace se důrazně doporučuje, protože:

  • Zajišťuje, že aplikace negeneruje příliš mnoho připojení k databázi, a zabrání tak překročení limitů připojení.
  • Může výrazně zlepšit výkon – latenci i propustnost. Proces serveru PostgreSQL musí vytvořit fork, aby zvládl každé nové připojení, a opakovaného použádání připojení zabrání této režii.

V následujícím kódu nahraďte <cluster> názvem clusteru a <heslem> vaším heslem správce.

Poznámka

Tento příklad ukončí připojení na konci, takže pokud chcete spustit další ukázky v článku ve stejné relaci, nezahrnujte do tohoto oddílu # Clean up při spuštění této ukázky.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Když se kód úspěšně spustí, vytvoří následující výstup:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuce tabulek

Azure Cosmos DB for PostgreSQL nabízí super výkon při distribuci tabulek napříč několika uzly pro zajištění škálovatelnosti. Následující příkaz umožňuje distribuovat tabulku. Další informace o create_distributed_table a distribučním sloupci najdete tady.

Poznámka

Distribuce tabulek umožňuje jejich růst napříč všemi pracovními uzly přidanou do clusteru.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Čtení dat

Následující příklad kódu používá ke čtení dat z databáze následující rozhraní API:

# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Aktualizace dat

Následující příklad kódu používá cursor.execute s příkazem SQL UPDATE k aktualizaci dat.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Odstranění dat

Následující příklad kódu se spustí cursor.execute s příkazem SQL DELETE, který odstraní data.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Příkaz COPY pro rychlý příjem dat

Příkaz COPY může při ingestování dat do služby Azure Cosmos DB for PostgreSQL poskytovat obrovskou propustnost . Příkaz COPY může ingestovat data v souborech nebo z mikrodávek dat v paměti pro příjem dat v reálném čase.

Příkaz COPY pro načtení dat ze souboru

Následující kód zkopíruje data ze souboru CSV do databázové tabulky. Kód vyžaduje soubor pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Příkaz COPY pro načtení dat v paměti

Následující kód zkopíruje data v paměti do tabulky.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Opakování pokusu aplikace o selhání žádostí o databázi

Někdy je možné, že žádosti o databázi z vaší aplikace selžou. K takovým problémům může docházet v různých scénářích, jako je selhání sítě mezi aplikací a databází, nesprávné heslo atd. Některé problémy můžou být přechodné a samy se vyřeší během několika sekund až minut. V aplikaci můžete nakonfigurovat logiku opakování, aby se přechodné chyby vyřešily.

Konfigurace logiky opakování ve vaší aplikaci pomáhá zlepšit prostředí koncového uživatele. Ve scénářích selhání budou uživatelé pouze chvíli čekat, než aplikace obslouží požadavky, místo aby došlo k chybám.

Následující příklad ukazuje, jak do aplikace implementovat logiku opakování. Ukázkový fragment kódu zkouší požadavek databáze každých 60 sekund (až pětkrát), dokud nebude úspěšný. Počet a četnost opakování je možné nakonfigurovat na základě potřeb vaší aplikace.

V tomto kódu nahraďte <cluster> názvem a <heslem> vašeho clusteru heslem správce.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Další kroky