Usar o Python para se conectar e executar comandos SQL no Azure Cosmos DB for PostgreSQL

APLICA-SE A: Azure Cosmos DB for PostgreSQL (da plataforma da extensão de dados Citus para PostgreSQL)

Este início rápido mostra como usar o código Python para conectar um cluster e usar as instruções SQL para criar uma tabela. Em seguida, você irá inserir, consultar, atualizar e excluir dados no banco de dados. As etapas neste artigo pressupõem que você esteja familiarizado com o desenvolvimento usando Python e que tenha começado a trabalhar recentemente com o Azure Cosmos DB for PostgreSQL.

Instalar a biblioteca PostgreSQL

Os exemplos de código neste artigo exigem a biblioteca psycopg2. Será necessário instalar o psycopg2 com o gerenciador de pacotes de idiomas (como pip).

Conectar-se, criar uma tabela e inserir dados

O exemplo de código a seguir cria um pool de conexões para o banco de dados Postgres. Em seguida, usará as funções cursor.execute com as instruções SQL CREATE TABLE e INSERT INTO para criar uma tabela e inserir os dados.

Dica

O código de exemplo abaixo usa um pool de conexões para criar e gerenciar conexões com o PostgreSQL. O pool de conexões do lado do aplicativo é altamente recomendado porque:

  • Ele garante que o aplicativo não gere muitas conexões com o banco de dados e, portanto, evita exceder os limites de conexão.
  • Isso pode ajudar a aprimorar drasticamente o desempenho, tanto de latência quanto de taxa de transferência. O processo do servidor PostgreSQL precisa ser bifurcado para gerenciar cada nova conexão e reutilizar uma conexão evita essa sobrecarga.

No código a seguir, substitua <cluster> pelo nome do seu cluster e <password> pela senha do administrador.

Observação

Este exemplo fecha a conexão no final. Portanto, se você quiser executar os outros exemplos no artigo na mesma sessão, não inclua a seção # Clean up ao executar este exemplo.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Quando o código é executado com êxito, ele produz a seguinte saída:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuir tabelas

O Azure Cosmos DB for PostgreSQL oferece a superpotência da distribuição de tabelas entre vários nós, permitindo a escalabilidade. O comando abaixo permite a distribuição de uma tabela. Saiba mais sobre create_distributed_table e a coluna de distribuição aqui.

Observação

A distribuição de tabelas permite que elas cresçam em todos os nós de trabalho adicionados ao cluster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Ler dados

O exemplo de código a seguir usa as seguintes APIs para ler dados do banco de dados:

  • a função cursor.execute com a instrução SQL SELECT para ler dados.
  • a função cursor.fetchall() para aceitar uma consulta e retornar um conjunto de resultados para iteração.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Atualizar dados

O exemplo de código a seguir usa a função cursor.execute com a instrução SQL UPDATE para atualizar dados.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Excluir dados

O exemplo de código a seguir executa cursor.execute com a instrução SQL DELETE para excluir os dados.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Comando COPY para ingestão rápida

O comando COPY pode gerar uma taxa de transferência enorme ao ingerir dados no Azure Cosmos DB for PostgreSQL. O comando COPY pode ingerir dados em arquivos ou de microlotes de dados na memória para ingestão em tempo real.

Comando COPY para carregar dados de um arquivo

O código a seguir copia dados de um arquivo CSV para uma tabela de banco de dados. O código exige o arquivo pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Comando COPY para carregar dados na memória

O código a seguir copia dados na memória em uma tabela.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Repetição de aplicativo para falhas de solicitação de banco de dados

Às vezes, as solicitações de banco de dados do aplicativo podem falhar. Esses problemas podem ocorrer em diferentes cenários, como falha de rede entre o aplicativo e o banco de dados, senha incorreta etc. Alguns problemas podem ser transitórios e serem resolvidos em alguns segundos ou minutos. Você pode configurar a lógica de repetição no aplicativo para corrigir os erros transitórios.

A configuração da lógica de repetição no aplicativo ajuda a aprimorar a experiência do usuário final. Em cenários de falha, os usuários vão apenas esperar um pouco mais para que o aplicativo atenda às solicitações, em vez de receber erros.

O exemplo abaixo mostra como implementar a lógica de repetição no aplicativo. O snippet de código de exemplo tenta uma solicitação de banco de dados a cada 60 segundos (até cinco vezes) até que ela seja bem-sucedida. O número e a frequência de repetições podem ser configurados com base nas necessidades do aplicativo.

No código, substitua <cluster> pelo nome do cluster e <senha> pela senha do administrador.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Próximas etapas