PostgreSQL için Azure Cosmos DB'de SQL komutlarına bağlanmak ve komut çalıştırmak için Python kullanma

ŞUNLAR IÇIN GEÇERLIDIR: PostgreSQL için Azure Cosmos DB (PostgreSQL'e citus veritabanı uzantısı tarafından desteklenir)

Bu hızlı başlangıçta python kodunu kullanarak bir kümeye bağlanma ve SQL deyimlerini kullanarak tablo oluşturma işlemleri gösterilmektedir. Ardından veritabanındaki verileri ekler, sorgular, güncelleştirir ve silersiniz. Bu makaledeki adımlarda Python geliştirme hakkında bilgi sahibi olduğunuz ve PostgreSQL için Azure Cosmos DB ile çalışmaya yeni olduğunuz varsayılır.

PostgreSQL kitaplığını yükleme

Bu makaledeki kod örnekleri psycopg2 kitaplığını gerektirir. Dil paketi yöneticinizle (pip gibi) psycopg2 yüklemeniz gerekir.

Bağlanma, tablo oluşturma ve veri ekleme

Aşağıdaki kod örneği Postgres veritabanınıza bir bağlantı havuzu oluşturur. Ardından, tablo oluşturmak ve veri eklemek için SQL CREATE TABLE ve INSERT INTO deyimleriyle cursor.execute işlevlerini kullanır.

İpucu

Aşağıdaki örnek kod, PostgreSQL bağlantılarını oluşturmak ve yönetmek için bir bağlantı havuzu kullanır. Uygulama tarafı bağlantı havuzu kesinlikle önerilir çünkü:

  • Uygulamanın veritabanına çok fazla bağlantı oluşturmamasını sağlar ve bu nedenle bağlantı sınırlarının aşılmasını önler.
  • Hem gecikme süresi hem de aktarım hızı gibi performansı önemli ölçüde artırmaya yardımcı olabilir. PostgreSQL sunucu işleminin her yeni bağlantıyı işlemek için çatal oluşturması gerekir ve bir bağlantıyı yeniden kullanmak bu ek yükü önler.

Aşağıdaki kodda kümeyi küme adınızla ve <parolanızla> yönetici parolanızla değiştirin<.>

Not

Bu örnek sonunda bağlantıyı kapatır, bu nedenle makaledeki diğer örnekleri aynı oturumda çalıştırmak istiyorsanız, bu örneği çalıştırdığınızda bölümünü eklemeyin # Clean up .

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

Kod başarıyla çalıştırıldığında aşağıdaki çıkışı üretir:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Tabloları dağıtma

PostgreSQL için Azure Cosmos DB, ölçeklenebilirlik için tabloları birden çok düğüme dağıtmanın süper gücünü sağlar. Aşağıdaki komut bir tabloyu dağıtmanızı sağlar. Ve dağıtım sütunu hakkında create_distributed_table daha fazla bilgiyi burada bulabilirsiniz.

Not

Tabloların dağıtılması, kümeye eklenen tüm çalışan düğümleri arasında büyümelerine olanak tanır.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Verileri okuma

Aşağıdaki kod örneği, veritabanındaki verileri okumak için aşağıdaki API'leri kullanır:

  • verileri okumak için SQL SELECT deyimiyle cursor.execute.
  • bir sorguyu kabul etmek ve yinelemek için bir sonuç kümesi döndürmek için cursor.fetchall() öğesini seçin.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Verileri güncelleştirme

Aşağıdaki kod örneği, verileri güncelleştirmek için SQL UPDATE deyimiyle birlikte kullanır cursor.execute .

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Verileri silme

Aşağıdaki kod örneği, verileri silmek için SQL DELETE deyimiyle birlikte çalışır cursor.execute .

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

Hızlı alma için COPY komutu

COPY komutu, PostgreSQL için Azure Cosmos DB'ye veri alırken muazzam aktarım hızı sağlayabilir. COPY komutu, gerçek zamanlı veri alımı için dosyalara veya bellekteki mikro veri toplu işlemlerinden veri alabilir.

Bir dosyadan veri yüklemek için COPY komutu

Aşağıdaki kod bir CSV dosyasındaki verileri veritabanı tablosuna kopyalar. Kod için dosyasınınpharmacies.csvgerekir.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

Bellek içi verileri yüklemek için COPY komutu

Aşağıdaki kod bellek içi verileri bir tabloya kopyalar.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Veritabanı isteği hataları için uygulama yeniden denemesi

Bazen uygulamanızdan gelen veritabanı isteklerinin başarısız olması mümkündür. Bu tür sorunlar, uygulama ve veritabanı arasındaki ağ hatası, yanlış parola vb. gibi farklı senaryolarda oluşabilir. Bazı sorunlar geçici olabilir ve birkaç saniye ile dakika arasında çözülebilir. Geçici hataların üstesinden gelmek için uygulamanızda yeniden deneme mantığını yapılandırabilirsiniz.

Uygulamanızda yeniden deneme mantığını yapılandırmak, son kullanıcı deneyimini geliştirmeye yardımcı olur. Hata senaryoları altında kullanıcılar, hatalarla karşılaşmak yerine uygulamanın isteklere hizmet vermesini yalnızca biraz daha bekler.

Aşağıdaki örnekte, uygulamanızda yeniden deneme mantığının nasıl uygulandığı gösterilmektedir. Örnek kod parçacığı, başarılı olana kadar her 60 saniyede bir (en fazla beş kez) bir veritabanı isteği dener. Yeniden deneme sayısı ve sıklığı, uygulamanızın gereksinimlerine göre yapılandırılabilir.

Bu kodda kümeyi küme> adınızla ve <parolanızla> yönetici parolanızla değiştirin<.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Sonraki adımlar