Partilhar via


Usar Python para conectar e executar comandos SQL no Azure Cosmos DB para PostgreSQL

Importante

O Azure Cosmos DB para PostgreSQL não tem mais suporte para novos projetos. Não use este serviço para novos projetos. Em vez disso, use um destes dois serviços:

  • Use o Azure Cosmos DB para NoSQL para obter uma solução de banco de dados distribuído projetada para cenários de alta escala com um SLA (contrato de nível de serviço) de disponibilidade de 99.999%, dimensionamento automático instantâneo e failover automático em várias regiões.

  • Use a funcionalidade de Clusters Elásticos do Azure para PostgreSQL para PostgreSQL fragmentado, utilizando a extensão Citus de código aberto.

Este guia de início rápido mostra como usar o código Python para se conectar a um cluster e usar instruções SQL para criar uma tabela. Em seguida, você inserirá, consultará, atualizará e excluirá dados no banco de dados. As etapas neste artigo pressupõem que você esteja familiarizado com o desenvolvimento Python e seja novo no trabalho com o Azure Cosmos DB para PostgreSQL.

Instalar a biblioteca PostgreSQL

Os exemplos de código neste artigo requerem a biblioteca psycopg2 . Você precisará instalar o psycopg2 com seu gerenciador de pacotes de idiomas (como pip).

Conectar, criar uma tabela e inserir dados

O exemplo de código a seguir cria um pool de conexões para seu banco de dados Postgres. Em seguida, ele usa funções cursor.execute com instruções SQL, CREATE TABLE e INSERT INTO para criar uma tabela e inserir dados.

Gorjeta

O código de exemplo abaixo usa um pool de conexões para criar e gerenciar conexões com o PostgreSQL. O agrupamento de conexões no lado do aplicativo é altamente recomendado porque:

  • Ele garante que o aplicativo não gere muitas conexões com o banco de dados e, portanto, evita exceder os limites de conexão.
  • Ele pode ajudar a melhorar drasticamente o desempenho, tanto a latência quanto a taxa de transferência. O processo do servidor PostgreSQL deve ser bifurcado para lidar com cada nova conexão, e a reutilização de uma conexão evita essa sobrecarga.

No código a seguir, substitua <cluster> pelo nome do seu cluster e <password> pela sua senha de administrador ou token de ID do Microsoft Entra.

Nota

Este exemplo fecha a conexão no final, portanto, se você quiser executar os outros exemplos no artigo na mesma sessão, não inclua a # Clean up seção quando executar este exemplo.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Quando o código é executado com êxito, ele produz a seguinte saída:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuir tabelas

O Azure Cosmos DB para PostgreSQL oferece o super poder de distribuir tabelas entre vários nós para escalabilidade. O comando abaixo permite distribuir uma tabela. Você pode saber mais sobre create_distributed_table e a coluna de distribuição aqui.

Nota

A distribuição de tabelas permite o seu crescimento em todos os nós de trabalho adicionados ao cluster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Ler dados

O exemplo de código a seguir usa as seguintes APIs para ler dados do banco de dados:

  • cursor.execute com a instrução SQL SELECT para ler dados.
  • cursor.fetchall() para aceitar uma consulta e retornar um conjunto de resultados para iterar.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Atualizar dados

O exemplo de código a seguir usa cursor.execute com a instrução SQL UPDATE para atualizar dados.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Eliminar dados

O exemplo de código a seguir é executado cursor.execute com a instrução SQL DELETE para excluir os dados.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Comando COPY para ingestão rápida

O comando COPY pode gerar uma taxa de transferência tremenda ao ingerir dados no Azure Cosmos DB para PostgreSQL. O comando COPY pode ingerir dados em arquivos ou de microlotes de dados na memória para ingestão em tempo real.

Comando COPY para carregar dados de um ficheiro

O código a seguir copia dados de um arquivo CSV para uma tabela de banco de dados. O código requer o arquivo pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Comando COPY para carregar dados na memória

O código a seguir copia dados na memória para uma tabela.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Repetição de aplicativo para falhas de solicitação de banco de dados

Às vezes, é possível que as solicitações de banco de dados do seu aplicativo falhem. Tais problemas podem acontecer em diferentes cenários, como falha de rede entre o aplicativo e o banco de dados, senha incorreta, etc. Alguns problemas podem ser transitórios e resolver-se em poucos segundos a minutos. Você pode configurar a lógica de repetição em seu aplicativo para superar os erros transitórios.

Configurar a lógica de repetição em seu aplicativo ajuda a melhorar a experiência do usuário final. Em cenários de falha, os usuários apenas esperarão um pouco mais para que o aplicativo atenda às solicitações, em vez de experimentar erros.

O exemplo abaixo mostra como implementar a lógica de repetição em seu aplicativo. O trecho de código de exemplo tenta uma solicitação de banco de dados a cada 60 segundos (até cinco vezes) até que seja bem-sucedido. O número e a frequência de novas tentativas podem ser configurados com base nas necessidades do seu aplicativo.

Neste código, substitua <o cluster> pelo nome e <senha> do cluster pela senha de administrador.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Próximos passos