Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Это важно
Azure Cosmos DB для PostgreSQL больше не поддерживается для новых проектов. Не используйте эту службу для новых проектов. Вместо этого используйте одну из этих двух служб:
Используйте Azure Cosmos DB для NoSQL как распределенное решение базы данных, предназначенное для крупномасштабных сценариев с соглашением об уровне доступности (SLA) 99.999%, мгновенным автомасштабированием и автоматическим переключением в случае отказа в нескольких регионах.
Используйте функцию эластичных кластеров Базы данных Azure для PostgreSQL для сегментированного PostgreSQL с помощью расширения Citus с открытым кодом.
В этом кратком руководстве показано, как использовать код Python для подключения к кластеру и использовать инструкции SQL для создания таблицы. Затем вы вставляете, запрашиваете, обновляете и удаляете данные в базе данных. В этой статье предполагается, что вы знакомы с разработкой Python и знакомы с Azure Cosmos DB для PostgreSQL.
Установка библиотеки PostgreSQL
Примеры кода в этой статье требуют библиотеки psycopg2 . Вам потребуется установить psycopg2 с помощью диспетчера языковых пакетов (например, pip).
Подключение, создание таблицы и вставка данных
В следующем примере кода создается пул подключений к базе данных Postgres. Затем она использует функции cursor.execute с инструкциями SQL CREATE TABLE и INSERT INTO для создания таблицы и вставки данных.
Совет
В приведенном ниже примере кода используется пул подключений для создания подключений к PostgreSQL и управления ими. Настоятельно рекомендуется использовать пул подключений на стороне приложения, так как:
- Это гарантирует, что приложение не будет создавать слишком много подключений к базе данных, поэтому можно будет избежать превышения ограничений на подключения.
- Это может значительно повысить производительность — как с точки зрения задержки, так и с точки зрения пропускной способности. Процесс сервера PostgreSQL должен разделиться, чтобы обработать каждое новое подключение, а повторное использование подключения позволяет избежать этих издержек.
В следующем коде замените <кластер> вашим именем кластера и <паролем> вашего администратора или маркером идентификатора Microsoft Entra.
Примечание.
Этот пример закрывает подключение в конце, поэтому если вы хотите запустить другие примеры в статье в том же сеансе, не включайте # Clean up раздел при запуске этого примера.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
При успешном выполнении кода возвращаются следующие данные:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Распределение таблиц
Azure Cosmos DB для PostgreSQL предоставляет супер возможности распределения таблиц между несколькими узлами для масштабируемости. С помощью приведенной ниже команды можно распределить таблицу. Дополнительные сведения о create_distributed_table и столбце распределения см. здесь.
Примечание.
Распределение таблиц позволяет им расширяться на любые рабочие узлы, добавляемые в кластер.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Чтение данных
В следующем примере кода используются следующие API для чтения данных из базы данных:
- cursor.execute с помощью инструкции SQL SELECT для чтения данных.
- cursor.fetchall() принимает запрос и возвращает результирующий набор для итерации.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Обновление данных
В следующем примере кода используется cursor.execute оператор SQL UPDATE для обновления данных.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Удаление данных
В следующем примере кода выполняется cursor.execute инструкция SQL DELETE для удаления данных.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
Команда COPY для быстрого приема
Команда COPY может обеспечить огромную пропускную способность при приеме данных в Azure Cosmos DB для PostgreSQL. Команда COPY может принимать данные в файлах или из микропакетов данных в памяти для приема в реальном времени.
Команда COPY для загрузки данных из файла
Следующий код копирует данные из CSV-файла в таблицу базы данных. Чтобы код работал, требуется файл pharmacies.csv.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
Команда COPY для загрузки данных в памяти
Следующий код копирует данные в память в таблицу.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
Повторные попытки приложения при сбоях запросов к базе данных
Иногда запросы к базе данных из приложения могут завершаться ошибкой. Такие проблемы могут возникать при различных сценариях, включая сбой сети между приложением и базой данных, неправильный пароль и т. д. Некоторые проблемы могут быть временными и самостоятельно устраняться в течение нескольких секунд или минут. Для устранения временных ошибок можно настроить в приложении логику повторных попыток.
Настройка логики повторных попыток в приложении помогает улучшить взаимодействие с пользователем. В сценариях сбоя пользователи просто будут немного дольше ждать, пока приложение обслужит запросы, а не столкнутся с ошибками.
В приведенном ниже примере показано, как реализовать логику повторных попыток в вашем приложении. Фрагмент примера кода пытается выполнить запрос к базе данных каждые 60 секунд (до пяти раз) до достижения требуемого результата. Количество и частоту повторных попыток можно настроить в зависимости от требований вашего приложения.
В этом коде замените <кластер> именем вашего кластера и <пароль> паролем администратора.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Следующие шаги
- Узнайте, как API Azure Cosmos DB для PostgreSQL расширяет PostgreSQL и попробуйте полезные диагностические запросы
- Выбор оптимального размера кластера для рабочей нагрузки
- Мониторинг производительности кластера