PostgreSQL için Azure Cosmos DB'de SQL komutlarına bağlanmak ve komut çalıştırmak için Python kullanma
ŞUNLAR IÇIN GEÇERLIDIR: PostgreSQL için Azure Cosmos DB (PostgreSQL'e citus veritabanı uzantısı tarafından desteklenir)
Bu hızlı başlangıçta python kodunu kullanarak bir kümeye bağlanma ve SQL deyimlerini kullanarak tablo oluşturma işlemleri gösterilmektedir. Ardından veritabanındaki verileri ekler, sorgular, güncelleştirir ve silersiniz. Bu makaledeki adımlarda Python geliştirme hakkında bilgi sahibi olduğunuz ve PostgreSQL için Azure Cosmos DB ile çalışmaya yeni olduğunuz varsayılır.
PostgreSQL kitaplığını yükleme
Bu makaledeki kod örnekleri psycopg2 kitaplığını gerektirir. Dil paketi yöneticinizle (pip gibi) psycopg2 yüklemeniz gerekir.
Bağlanma, tablo oluşturma ve veri ekleme
Aşağıdaki kod örneği Postgres veritabanınıza bir bağlantı havuzu oluşturur. Ardından, tablo oluşturmak ve veri eklemek için SQL CREATE TABLE ve INSERT INTO deyimleriyle cursor.execute işlevlerini kullanır.
İpucu
Aşağıdaki örnek kod, PostgreSQL bağlantılarını oluşturmak ve yönetmek için bir bağlantı havuzu kullanır. Uygulama tarafı bağlantı havuzu kesinlikle önerilir çünkü:
- Uygulamanın veritabanına çok fazla bağlantı oluşturmamasını sağlar ve bu nedenle bağlantı sınırlarının aşılmasını önler.
- Hem gecikme süresi hem de aktarım hızı gibi performansı önemli ölçüde artırmaya yardımcı olabilir. PostgreSQL sunucu işleminin her yeni bağlantıyı işlemek için çatal oluşturması gerekir ve bir bağlantıyı yeniden kullanmak bu ek yükü önler.
Aşağıdaki kodda kümeyi küme adınızla ve <parolanızla> yönetici parolanızla değiştirin<.>
Not
Bu örnek sonunda bağlantıyı kapatır, bu nedenle makaledeki diğer örnekleri aynı oturumda çalıştırmak istiyorsanız, bu örneği çalıştırdığınızda bölümünü eklemeyin # Clean up
.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
Kod başarıyla çalıştırıldığında aşağıdaki çıkışı üretir:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Tabloları dağıtma
PostgreSQL için Azure Cosmos DB, ölçeklenebilirlik için tabloları birden çok düğüme dağıtmanın süper gücünü sağlar. Aşağıdaki komut bir tabloyu dağıtmanızı sağlar. Ve dağıtım sütunu hakkında create_distributed_table
daha fazla bilgiyi burada bulabilirsiniz.
Not
Tabloların dağıtılması, kümeye eklenen tüm çalışan düğümleri arasında büyümelerine olanak tanır.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Verileri okuma
Aşağıdaki kod örneği, veritabanındaki verileri okumak için aşağıdaki API'leri kullanır:
- verileri okumak için SQL SELECT deyimiyle cursor.execute.
- bir sorguyu kabul etmek ve yinelemek için bir sonuç kümesi döndürmek için cursor.fetchall() öğesini seçin.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Verileri güncelleştirme
Aşağıdaki kod örneği, verileri güncelleştirmek için SQL UPDATE deyimiyle birlikte kullanır cursor.execute
.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Verileri silme
Aşağıdaki kod örneği, verileri silmek için SQL DELETE deyimiyle birlikte çalışır cursor.execute
.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
Hızlı alma için COPY komutu
COPY komutu, PostgreSQL için Azure Cosmos DB'ye veri alırken muazzam aktarım hızı sağlayabilir. COPY komutu, gerçek zamanlı veri alımı için dosyalara veya bellekteki mikro veri toplu işlemlerinden veri alabilir.
Bir dosyadan veri yüklemek için COPY komutu
Aşağıdaki kod bir CSV dosyasındaki verileri veritabanı tablosuna kopyalar. Kod için dosyasınınpharmacies.csvgerekir.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
Bellek içi verileri yüklemek için COPY komutu
Aşağıdaki kod bellek içi verileri bir tabloya kopyalar.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
Veritabanı isteği hataları için uygulama yeniden denemesi
Bazen uygulamanızdan gelen veritabanı isteklerinin başarısız olması mümkündür. Bu tür sorunlar, uygulama ve veritabanı arasındaki ağ hatası, yanlış parola vb. gibi farklı senaryolarda oluşabilir. Bazı sorunlar geçici olabilir ve birkaç saniye ile dakika arasında çözülebilir. Geçici hataların üstesinden gelmek için uygulamanızda yeniden deneme mantığını yapılandırabilirsiniz.
Uygulamanızda yeniden deneme mantığını yapılandırmak, son kullanıcı deneyimini geliştirmeye yardımcı olur. Hata senaryoları altında kullanıcılar, hatalarla karşılaşmak yerine uygulamanın isteklere hizmet vermesini yalnızca biraz daha bekler.
Aşağıdaki örnekte, uygulamanızda yeniden deneme mantığının nasıl uygulandığı gösterilmektedir. Örnek kod parçacığı, başarılı olana kadar her 60 saniyede bir (en fazla beş kez) bir veritabanı isteği dener. Yeniden deneme sayısı ve sıklığı, uygulamanızın gereksinimlerine göre yapılandırılabilir.
Bu kodda kümeyi küme> adınızla ve <parolanızla> yönetici parolanızla değiştirin<.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Sonraki adımlar
- PostgreSQL için Azure Cosmos DB API'sinin PostgreSQL'i nasıl genişlettiklerini görün ve yararlı tanılama sorgularını deneyin
- İş yükünüz için en iyi küme boyutunu seçin
- Küme performansını izleme