Utilizar o Python para ligar e executar comandos SQL no Azure Cosmos DB para PostgreSQL

APLICA-SE A: Azure Cosmos DB para PostgreSQL (com tecnologia da extensão da base de dados Citus para PostgreSQL)

Este início rápido mostra-lhe como utilizar o código Python para ligar a um cluster e utilizar instruções SQL para criar uma tabela. Em seguida, irá inserir, consultar, atualizar e eliminar dados na base de dados. Os passos neste artigo partem do princípio de que está familiarizado com o desenvolvimento do Python e que nunca trabalhou com o Azure Cosmos DB para PostgreSQL.

Instalar a biblioteca do PostgreSQL

Os exemplos de código neste artigo requerem a biblioteca psycopg2 . Terá de instalar o psycopg2 com o seu gestor de pacotes de idiomas (como pip).

Ligar, criar uma tabela e inserir dados

O exemplo de código seguinte cria um conjunto de ligações para a sua base de dados Postgres. Em seguida, utiliza funções cursor.execute com instruções SQL CREATE TABLE e INSERT INTO para criar uma tabela e inserir dados.

Dica

O código de exemplo abaixo utiliza um conjunto de ligações para criar e gerir ligações ao PostgreSQL. O conjunto de ligações do lado da aplicação é altamente recomendado porque:

  • Garante que a aplicação não gera demasiadas ligações à base de dados, pelo que evita exceder os limites de ligação.
  • Pode ajudar a melhorar drasticamente o desempenho, tanto a latência como o débito. O processo do servidor PostgreSQL tem de fork para processar cada nova ligação e reutilizar uma ligação evita essa sobrecarga.

No código seguinte, substitua <cluster> pelo nome do cluster e <palavra-passe pela palavra-passe> de administrador.

Nota

Este exemplo fecha a ligação no final, por isso, se quiser executar os outros exemplos no artigo na mesma sessão, não inclua a # Clean up secção quando executar este exemplo.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Quando o código é executado com êxito, produz o seguinte resultado:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuir tabelas

O Azure Cosmos DB para PostgreSQL dá-lhe a capacidade de distribuir tabelas por vários nós para escalabilidade. O comando abaixo permite-lhe distribuir uma tabela. Pode saber mais sobre create_distributed_table e a coluna de distribuição aqui.

Nota

A distribuição de tabelas permite-lhes crescer em todos os nós de trabalho adicionados ao cluster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Ler dados

O seguinte exemplo de código utiliza as seguintes APIs para ler dados da base de dados:

  • cursor.execute com a instrução SQL SELECT para ler dados.
  • cursor.fetchall() para aceitar uma consulta e devolver um conjunto de resultados para iterar.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Atualizar dados

O seguinte exemplo de código utiliza cursor.execute com a instrução SQL UPDATE para atualizar os dados.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Eliminar dados

O seguinte exemplo de código é executado cursor.execute com a instrução SQL DELETE para eliminar os dados.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Comando COPY para ingestão rápida

O comando COPY pode gerar um débito tremendo ao ingerir dados no Azure Cosmos DB para PostgreSQL. O comando COPY pode ingerir dados em ficheiros ou a partir de micro lotes de dados na memória para ingestão em tempo real.

Comando COPY para carregar dados de um ficheiro

O código seguinte copia dados de um ficheiro CSV para uma tabela de base de dados. O código requer o ficheiro pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Comando COPY para carregar dados dentro da memória

O código seguinte copia dados dentro da memória para uma tabela.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Repetição da aplicação para falhas de pedidos de base de dados

Por vezes, é possível que os pedidos de base de dados da sua aplicação falhem. Estes problemas podem ocorrer em diferentes cenários, como a falha de rede entre a aplicação e a base de dados, palavra-passe incorreta, etc. Alguns problemas podem ser transitórios e resolver-se dentro de alguns segundos a minutos. Pode configurar a lógica de repetição na sua aplicação para ultrapassar os erros transitórios.

Configurar a lógica de repetição na sua aplicação ajuda a melhorar a experiência do utilizador final. Em cenários de falha, os utilizadores apenas aguardarão um pouco mais para que a aplicação sirva pedidos, em vez de deparar-se com erros.

O exemplo abaixo mostra como implementar a lógica de repetição na sua aplicação. O fragmento de código de exemplo tenta um pedido de base de dados a cada 60 segundos (até cinco vezes) até ter êxito. O número e a frequência das repetições podem ser configurados com base nas necessidades da sua aplicação.

Neste código, substitua cluster> pelo <nome do cluster e <palavra-passe pela palavra-passe> de administrador.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Passos seguintes