Nawiązywanie połączenia i uruchamianie poleceń SQL w usłudze Azure Cosmos DB for PostgreSQL za pomocą języka Python
DOTYCZY: Usługa Azure Cosmos DB for PostgreSQL (obsługiwana przez rozszerzenie bazy danych Citus do bazy danych PostgreSQL)
W tym przewodniku Szybki start pokazano, jak używać kodu w języku Python do nawiązywania połączenia z klastrem i używać instrukcji SQL do tworzenia tabeli. Następnie wstawisz, wykonasz zapytanie, zaktualizujesz i usuniesz dane w bazie danych. W krokach opisanych w tym artykule założono, że znasz programowanie w języku Python i dopiero zaczynasz pracę z usługą Azure Cosmos DB for PostgreSQL.
Instalowanie biblioteki PostgreSQL
Przykłady kodu w tym artykule wymagają biblioteki psycopg2 . Musisz zainstalować pakiet psycopg2 za pomocą menedżera pakietów językowych (na przykład).
Łączenie, tworzenie tabeli i wstawianie danych
Poniższy przykład kodu tworzy pulę połączeń z bazą danych Postgres. Następnie używa funkcji cursor.execute z instrukcjami SQL CREATE TABLE i INSERT INTO, aby utworzyć tabelę i wstawić dane.
Napiwek
Poniższy przykładowy kod używa puli połączeń do tworzenia połączeń z bazą danych PostgreSQL i zarządzania nimi. Zdecydowanie zaleca się buforowanie połączeń po stronie aplikacji, ponieważ:
- Gwarantuje to, że aplikacja nie generuje zbyt wielu połączeń z bazą danych, dlatego unika przekraczania limitów połączeń.
- Może to pomóc znacząco poprawić wydajność — zarówno opóźnienie, jak i przepływność. Proces serwera PostgreSQL musi rozwidlić do obsługi każdego nowego połączenia i ponowne użycie połączenia pozwala uniknąć tego obciążenia.
W poniższym kodzie zastąp <klaster> nazwą klastra i <hasłem> przy użyciu hasła administratora lub tokenu identyfikatora entra firmy Microsoft.
Uwaga
Ten przykład zamyka połączenie na końcu, więc jeśli chcesz uruchomić inne przykłady w artykule w tej samej sesji, nie uwzględnij # Clean up
sekcji podczas uruchamiania tego przykładu.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
Po pomyślnym uruchomieniu kodu generuje następujące dane wyjściowe:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Dystrybuowanie tabel
Usługa Azure Cosmos DB for PostgreSQL zapewnia super możliwości dystrybucji tabel między wieloma węzłami w celu zapewnienia skalowalności. Poniższe polecenie umożliwia dystrybucję tabeli. Więcej informacji na temat create_distributed_table
i kolumny dystrybucji można znaleźć tutaj.
Uwaga
Dystrybucja tabel umożliwia ich rozwijanie między węzłami procesu roboczego dodanymi do klastra.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Odczyt danych
Poniższy przykład kodu używa następujących interfejsów API do odczytywania danych z bazy danych:
- cursor.execute z instrukcją SQL SELECT w celu odczytu danych.
- cursor.fetchall() w celu zaakceptowania zapytania i zwrócenia zestawu wyników w celu iteracji.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Aktualizowanie danych
Poniższy przykład kodu używa instrukcji cursor.execute
SQL UPDATE do aktualizowania danych.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Usuwanie danych
Poniższy przykładowy kod jest uruchamiany cursor.execute
z instrukcją SQL DELETE w celu usunięcia danych.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
POLECENIE COPY na potrzeby szybkiego pozyskiwania
Polecenie COPY może przynieść ogromną przepływność podczas pozyskiwania danych do usługi Azure Cosmos DB for PostgreSQL. Polecenie COPY może pozyskiwać dane w plikach lub z mikrosadów danych w pamięci na potrzeby pozyskiwania danych w czasie rzeczywistym.
POLECENIE COPY w celu załadowania danych z pliku
Poniższy kod kopiuje dane z pliku CSV do tabeli bazy danych. Kod wymaga pharmacies.csv pliku.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
POLECENIE COPY w celu załadowania danych w pamięci
Poniższy kod kopiuje dane w pamięci do tabeli.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
Ponów próbę aplikacji dla błędów żądań bazy danych
Czasami istnieje możliwość, że żądania bazy danych z aplikacji kończą się niepowodzeniem. Takie problemy mogą wystąpić w różnych scenariuszach, takich jak awaria sieci między aplikacją i bazą danych, nieprawidłowe hasło itp. Niektóre problemy mogą być przejściowe i rozwiązać się za kilka sekund do minut. Logikę ponawiania prób można skonfigurować w aplikacji, aby przezwyciężyć błędy przejściowe.
Konfigurowanie logiki ponawiania prób w aplikacji pomaga ulepszyć środowisko użytkownika końcowego. W scenariuszach awarii użytkownicy będą czekać nieco dłużej, aż aplikacja będzie obsługiwać żądania, a nie napotkać błędów.
W poniższym przykładzie pokazano, jak zaimplementować logikę ponawiania prób w aplikacji. Przykładowy fragment kodu próbuje żądać bazy danych co 60 sekund (maksymalnie pięć razy), dopóki nie powiedzie się. Liczbę i częstotliwość ponownych prób można skonfigurować na podstawie potrzeb aplikacji.
W tym kodzie zastąp <klaster> nazwą klastra i <hasłem> administratora.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Następne kroki
- Zobacz, jak interfejs API usługi Azure Cosmos DB for PostgreSQL rozszerza bazę danych PostgreSQL i wypróbuj przydatne zapytania diagnostyczne
- Wybieranie najlepszego rozmiaru klastra dla obciążenia
- Monitorowanie wydajności klastra