Menggunakan Python untuk menyambungkan dan menjalankan perintah SQL di Azure Cosmos DB for PostgreSQL

BERLAKU UNTUK: Azure Cosmos DB for PostgreSQL (didukung oleh ekstensi database Citus ke PostgreSQL)

Mulai cepat ini menunjukkan kepada Anda cara menggunakan kode Python untuk menyambungkan ke kluster, dan menggunakan pernyataan SQL untuk membuat tabel. Anda kemudian akan menyisipkan, mengkueri, memperbarui, dan menghapus data dalam database. Langkah-langkah dalam artikel ini mengasumsikan bahwa Anda terbiasa dengan pengembangan Python, dan baru bekerja dengan Azure Cosmos DB for PostgreSQL.

Menginstal pustaka PostgreSQL

Contoh kode dalam artikel ini memerlukan pustaka psycopg2 . Anda harus menginstal psycopg2 dengan manajer paket bahasa Anda (seperti pip).

Menyambungkan, membuat tabel, dan menyisipkan data

Contoh kode berikut membuat kumpulan koneksi ke database Postgres Anda. Kemudian menggunakan fungsi cursor.execute dengan pernyataan SQL CREATE TABLE dan INSERT INTO untuk membuat tabel dan menyisipkan data.

Tip

Kode sampel di bawah ini menggunakan kumpulan koneksi untuk membuat dan mengelola koneksi ke PostgreSQL. Pengumpulan koneksi sisi aplikasi sangat disarankan karena:

  • Tindakan ini memastikan bahwa aplikasi tidak menghasilkan terlalu banyak koneksi ke database, sehingga menghindari kelebihan batas koneksi.
  • Tindakan ini dapat membantu meningkatkan performa secara drastis baik latensi maupun throughput. Proses server PostgreSQL harus melakukan fork untuk menangani setiap koneksi baru, dan menggunakan kembali koneksi menghindari overhead tersebut.

Dalam kode berikut, ganti <kluster> dengan nama kluster dan <kata sandi> Anda dengan kata sandi administrator Anda.

Catatan

Contoh ini menutup koneksi di akhir, jadi jika Anda ingin menjalankan sampel lain dalam artikel dalam sesi yang sama, jangan sertakan # Clean up bagian saat Anda menjalankan sampel ini.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Ketika kode berjalan dengan sukses, kode menghasilkan output berikut:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Mendistribusikan tabel

Azure Cosmos DB for PostgreSQL memberi Anda kekuatan super dalam mendistribusikan tabel di beberapa simpul untuk skalabilitas. Perintah di bawah ini memungkinkan Anda untuk mendistribusikan tabel. Anda dapat mempelajari selengkapnya tentang create_distributed_table dan kolom distribusi di sini.

Catatan

Mendistribusikan tabel memungkinkan mereka tumbuh di setiap simpul pekerja yang ditambahkan ke kluster.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Membaca data

Contoh kode berikut menggunakan API berikut untuk membaca data dari database:

# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Memperbarui data

Contoh kode berikut menggunakan cursor.execute dengan pernyataan SQL UPDATE untuk memperbarui data.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Menghapus data

Contoh kode berikut berjalan cursor.execute dengan pernyataan SQL DELETE untuk menghapus data.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Perintah COPY untuk penyerapan cepat

Perintah COPY dapat menghasilkan throughput yang luar biasa sambil menyerap data ke Azure Cosmos DB for PostgreSQL. Perintah COPY dapat menyerap data dalam file, atau dari batch mikro data dalam memori untuk penyerapan real time.

Perintah COPY untuk memuat data dari file

Kode berikut menyalin data dari file CSV ke tabel database. Kode memerlukan file pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Perintah COPY untuk memuat data dalam memori

Kode berikut menyalin data dalam memori ke tabel.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Aplikasi mencoba kembali untuk kegagalan permintaan database

Terkadang ada kemungkinan permintaan database dari aplikasi Anda gagal. Masalah tersebut dapat terjadi dalam skenario yang berbeda, seperti kegagalan jaringan antara aplikasi dan database, kata sandi yang salah, dll. Beberapa masalah mungkin bersifat sementara, dan selesai sendiri dalam beberapa detik hingga menit. Anda dapat mengonfigurasi logika coba lagi di aplikasi Anda untuk mengatasi kesalahan sementara.

Mengonfigurasi logika coba lagi di aplikasi Anda membantu meningkatkan pengalaman pengguna akhir. Dalam skenario kegagalan, pengguna hanya akan menunggu sedikit lebih lama agar aplikasi melayani permintaan, alih-alih mengalami kesalahan.

Contoh di bawah ini menunjukkan cara menerapkan logika coba lagi di aplikasi Anda. Cuplikan kode sampel mencoba permintaan database setiap 60 detik (hingga lima kali) sampai berhasil. Jumlah dan frekuensi percobaan ulang dapat dikonfigurasi sesuai kebutuhan aplikasi Anda.

Dalam kode ini, ganti <kluster> dengan nama kluster dan <kata sandi> Anda dengan kata sandi administrator Anda.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Langkah berikutnya