Использование Python для подключения и выполнения команд SQL в Azure Cosmos DB для PostgreSQL
ПРИМЕНИМО К: Azure Cosmos DB для PostgreSQL (на базе расширения базы данных Citus для PostgreSQL)
В этом кратком руководстве показано, как использовать код Python для подключения к кластеру и использовать инструкции SQL для создания таблицы. Затем вы будете вставлять, запрашивать, обновлять и удалять данные в базе данных. В этой статье предполагается, что вы знакомы с разработкой на Python и еще не работали с Azure Cosmos DB для PostgreSQL.
Установка библиотеки PostgreSQL
Для примеров кода в этой статье требуется библиотека psycopg2 . Необходимо установить psycopg2 с помощью диспетчера языковых пакетов (например, pip).
Подключение, создание таблицы и вставка данных
В следующем примере кода создается пул подключений к базе данных Postgres. Затем он использует функции cursor.execute с инструкциями SQL CREATE TABLE и INSERT INTO для создания таблицы и вставки данных.
Совет
В приведенном ниже примере кода используется пул подключений для создания подключений к PostgreSQL и управления ими. Настоятельно рекомендуется использовать пул подключений на стороне приложения, так как:
- Это гарантирует, что приложение не будет создавать слишком много подключений к базе данных, поэтому можно будет избежать превышения ограничений на подключения.
- Это может значительно повысить производительность — как с точки зрения задержки, так и с точки зрения пропускной способности. Процесс сервера PostgreSQL должен разделиться, чтобы обработать каждое новое подключение, а повторное использование подключения позволяет избежать этих издержек.
В следующем коде замените <кластер> именем кластера, а <пароль> — паролем администратора.
Примечание
В этом примере подключение закрывается в конце, поэтому если вы хотите запустить другие примеры из статьи в том же сеансе, не включайте # Clean up
раздел при запуске этого примера.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
При успешном выполнении кода возвращаются следующие данные:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Распределение таблиц
Azure Cosmos DB для PostgreSQL предоставляет супер возможности распределения таблиц между несколькими узлами для обеспечения масштабируемости. С помощью приведенной ниже команды можно распределить таблицу. Дополнительные сведения о create_distributed_table
и столбце распределения см. здесь.
Примечание
Распределение таблиц позволяет увеличивать их по всем рабочим узлам, добавленным в кластер.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Чтение данных
В следующем примере кода для чтения данных из базы данных используются следующие API:
- cursor.execute с инструкцией SQL SELECT для чтения данных.
- cursor.fetchall() для принятия запроса и возврата результирующий набор для итерации.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Обновление данных
В следующем примере кода для обновления данных используется cursor.execute
инструкция SQL UPDATE.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Удаление данных
Следующий пример кода выполняется cursor.execute
с инструкцией SQL DELETE для удаления данных.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
Команда COPY для быстрого приема
Команда COPY может обеспечить огромную пропускную способность при приеме данных в Azure Cosmos DB для PostgreSQL. Команда COPY может принимать данные в файлах или из микропакетов данных в памяти для приема в реальном времени.
Команда COPY для загрузки данных из файла
Следующий код копирует данные из CSV-файла в таблицу базы данных. Для этого кода требуется файл pharmacies.csv.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
Команда COPY для загрузки данных в памяти
Следующий код копирует данные в памяти в таблицу.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
Повторная попытка приложения при сбоях запросов к базе данных
Иногда запросы к базе данных из приложения могут завершаться ошибкой. Такие проблемы могут возникать при различных сценариях, включая сбой сети между приложением и базой данных, неправильный пароль и т. д. Некоторые проблемы могут быть временными и самостоятельно устраняться в течение нескольких секунд или минут. Для устранения временных ошибок можно настроить в приложении логику повторных попыток.
Настройка логики повторных попыток в приложении помогает улучшить взаимодействие с пользователем. В сценариях сбоя пользователи просто будут немного дольше ждать, пока приложение обслужит запросы, а не столкнутся с ошибками.
В приведенном ниже примере показано, как реализовать логику повторных попыток в вашем приложении. Фрагмент примера кода пытается выполнить запрос к базе данных каждые 60 секунд (до пяти раз) до достижения требуемого результата. Количество и частоту повторных попыток можно настроить в зависимости от требований вашего приложения.
В этом коде замените <кластер> именем кластера, а <пароль> — паролем администратора.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Дальнейшие действия
- Узнайте, как API Azure Cosmos DB для PostgreSQL расширяет Возможности PostgreSQL, и попробуйте использовать полезные диагностические запросы.
- Выбор оптимального размера кластера для рабочей нагрузки
- Мониторинг производительности кластера