Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważne
Usługa Azure Cosmos DB for PostgreSQL nie jest już obsługiwana w przypadku nowych projektów. Nie używaj tej usługi dla nowych projektów. Zamiast tego użyj jednej z tych dwóch usług:
Użyj usługi Azure Cosmos DB for NoSQL dla rozproszonego rozwiązania bazy danych przeznaczonego dla scenariuszy o dużej skali z umową dotyczącą poziomu usług dostępności 99,999% (SLA), natychmiastowym skalowaniem automatycznym i automatycznym przejściem w tryb failover w wielu regionach.
Użyj funkcji Elastic Clusters usługi Azure Database for PostgreSQL na potrzeby fragmentowanej bazy danych PostgreSQL przy użyciu rozszerzenia Citus typu open source.
W tym przewodniku Szybki start pokazano, jak używać kodu w języku Python do nawiązywania połączenia z klastrem i używać instrukcji SQL do tworzenia tabeli. Następnie wstawisz, wykonasz zapytanie, zaktualizujesz i usuniesz dane w bazie danych. W krokach opisanych w tym artykule założono, że znasz programowanie w języku Python i dopiero zaczynasz pracę z usługą Azure Cosmos DB for PostgreSQL.
Instalowanie biblioteki PostgreSQL
Przykłady kodu w tym artykule wymagają biblioteki psycopg2 . Musisz zainstalować psycopg2 za pomocą menedżera pakietów dla języka Python (takiego jak pip).
Łączenie, tworzenie tabeli i wstawianie danych
Poniższy przykład kodu tworzy pulę połączeń z bazą danych Postgres. Następnie używa funkcji cursor.execute z instrukcjami SQL CREATE TABLE i INSERT INTO, aby utworzyć tabelę i wstawić dane.
Napiwek
Poniższy przykładowy kod używa puli połączeń do tworzenia połączeń z bazą danych PostgreSQL i zarządzania nimi. Zdecydowanie zaleca się wykorzystanie puli połączeń na poziomie aplikacji, ponieważ:
- Gwarantuje to, że aplikacja nie generuje zbyt wielu połączeń z bazą danych, dlatego unika przekraczania limitów połączeń.
- Może to pomóc znacząco poprawić wydajność — zarówno opóźnienie, jak i przepływność. Proces serwera PostgreSQL musi utworzyć nowy proces dla obsługi każdego nowego połączenia, a ponowne użycie połączenia pozwala uniknąć tego obciążenia.
W poniższym kodzie zastąp <klaster> nazwą klastra oraz <hasło> hasłem administratora lub tokenem Microsoft Entra ID.
Uwaga
Ten przykład zamyka połączenie na końcu, więc jeśli chcesz uruchomić inne przykłady w artykule w tej samej sesji, nie uwzględnij # Clean up sekcji podczas uruchamiania tego przykładu.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
Po pomyślnym uruchomieniu kodu generuje następujące dane wyjściowe:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Dystrybuowanie tabel
Usługa Azure Cosmos DB for PostgreSQL zapewnia niezwykłą możliwość dystrybucji tabel między wieloma węzłami w celu zapewnienia skalowalności. Poniższe polecenie umożliwia dystrybucję tabeli. Więcej informacji na temat create_distributed_table i kolumny dystrybucji można znaleźć tutaj.
Uwaga
Dystrybucja tabel umożliwia ich skalowanie pomiędzy węzłami roboczymi dodanymi do klastra.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Odczyt danych
Poniższy przykład kodu używa następujących interfejsów API do odczytywania danych z bazy danych:
- cursor.execute z instrukcją SQL SELECT w celu odczytu danych.
- cursor.fetchall() w celu zaakceptowania zapytania i zwrócenia zestawu wyników w celu iteracji.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Aktualizowanie danych
Poniższy przykład kodu używa instrukcji cursor.execute SQL UPDATE do aktualizowania danych.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Usuwanie danych
Poniższy przykładowy kod jest uruchamiany cursor.execute z instrukcją SQL DELETE w celu usunięcia danych.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
POLECENIE COPY na potrzeby szybkiego pozyskiwania
Polecenie COPY może przynieść ogromną przepływność podczas ładowania danych do usługi Azure Cosmos DB for PostgreSQL. Polecenie COPY może przetwarzać dane z plików lub z mikro-partii danych w pamięci na potrzeby przetwarzania w czasie rzeczywistym.
POLECENIE COPY w celu załadowania danych z pliku
Poniższy kod kopiuje dane z pliku CSV do tabeli bazy danych. Kod wymaga pliku pharmacies.csv.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
POLECENIE COPY w celu załadowania danych w pamięci
Poniższy kod kopiuje dane w pamięci do tabeli.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
Ponowne próby przy błędach żądań do bazy danych
Czasami istnieje możliwość, że żądania bazy danych z aplikacji kończą się niepowodzeniem. Takie problemy mogą wystąpić w różnych scenariuszach, takich jak awaria sieci między aplikacją i bazą danych, nieprawidłowe hasło itp. Niektóre problemy mogą być przejściowe i rozwiązać się za kilka sekund do minut. Logikę ponawiania prób można skonfigurować w aplikacji, aby przezwyciężyć błędy przejściowe.
Konfigurowanie logiki ponawiania prób w aplikacji pomaga ulepszyć środowisko użytkownika końcowego. W scenariuszach awarii użytkownicy będą czekać nieco dłużej, aż aplikacja będzie obsługiwać żądania, a nie napotkać błędów.
W poniższym przykładzie pokazano, jak zaimplementować logikę ponawiania prób w aplikacji. Przykładowy fragment kodu próbuje żądać bazy danych co 60 sekund (maksymalnie pięć razy), dopóki nie powiedzie się. Liczbę i częstotliwość ponownych prób można skonfigurować na podstawie potrzeb aplikacji.
W tym kodzie zastąp <klaster> nazwą klastra oraz <hasło> hasłem administratora.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Następne kroki
- Zobacz, jak interfejs API usługi Azure Cosmos DB for PostgreSQL rozszerza bazę danych PostgreSQL i wypróbuj przydatne zapytania diagnostyczne
- Wybieranie najlepszego rozmiaru klastra dla obciążenia
- Monitorowanie wydajności klastra