استخدام Python للاتصال وتشغيل أوامر SQL على Azure Cosmos DB ل PostgreSQL

ينطبق على: Azure Cosmos DB ل PostgreSQL (مدعوم بملحق قاعدة بيانات Citus إلى PostgreSQL)

يوضح لك هذا التشغيل السريع كيفية استخدام التعليمات البرمجية ل Python للاتصال بنظام مجموعة، واستخدام عبارات SQL لإنشاء جدول. ثم ستقوم بإدراج البيانات والاستعلام عنها وتحديثها وحذفها في قاعدة البيانات. تفترض الخطوات الواردة في هذه المقالة أنك على دراية بتطوير Python، وأنك حديث العهد بالعمل مع Azure Cosmos DB ل PostgreSQL.

تثبيت مكتبة PostgreSQL

تتطلب أمثلة التعليمات البرمجية في هذه المقالة مكتبة psycopg2 . ستحتاج إلى تثبيت psycopg2 مع مدير حزمة اللغة (مثل pip).

الاتصال وإنشاء جدول وإدراج البيانات

ينشئ مثال التعليمات البرمجية التالي تجمع اتصال بقاعدة بيانات Postgres. ثم يستخدم الدالات cursor.execute مع عبارات SQL CREATE TABLE وINSERT INTO لإنشاء جدول وإدراج البيانات.

تلميح

نموذج التعليمات البرمجية أدناه يستخدم مجموعة اتصال لإنشاء اتصالات إلى PostgreSQL وإدارتها. يوصى بشدة بمجموعة الاتصال من جانب التطبيق لأن:

  • ويضمن أن التطبيق لا ينشئ اتصالات كثيرة جدًا بقاعدة البيانات، وبالتالي يتجنب تجاوز حدود الاتصال.
  • يمكن أن يساعد على تحسين الأداء بشكل كبير - كل من زمن الانتقال ومعدل النقل. يجب أن تتشعب عملية خادم PostgreSQL لمعالجة كل اتصال جديد، وأن تتجنب إعادة استخدام الاتصال هذا الحمل.

في التعليمات البرمجية التالية، استبدل <نظام المجموعة> باسم نظام المجموعة وكلمة <المرور> بكلمة مرور المسؤول.

ملاحظة

يغلق هذا المثال الاتصال في النهاية، لذلك إذا كنت تريد تشغيل العينات الأخرى في المقالة في نفس جلسة العمل، فلا تقم بتضمين # Clean up المقطع عند تشغيل هذا النموذج.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

عند نجاح تشغيل التعليمة البرمجية يُنتج المخرجات التالية:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

توزيع الجداول

يمنحك Azure Cosmos DB ل PostgreSQL القوة الفائقة لتوزيع الجداول عبر عقد متعددة لقابلية التوسع. يمكنك الأمر أدناه من توزيع الجدول. يمكن معرفة المزيد حول create_distributed_table وعمود التوزيع هنا.

ملاحظة

يتيح لها توزيع الجداول النمو عبر أي عقد عاملة تمت إضافتها إلى نظام المجموعة.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

قراءة البيانات

يستخدم مثال التعليمات البرمجية التالي واجهات برمجة التطبيقات التالية لقراءة البيانات من قاعدة البيانات:

  • cursor.exe مع عبارة Select في SQL لقراءة البيانات.
  • cursor.fetchall() لقبول استعلام وإرجاع مجموعة نتائج إلى التكرار.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

تحديث البيانات

يستخدم cursor.execute مثال التعليمات البرمجية التالي مع عبارة SQL UPDATE لتحديث البيانات.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

حذف البيانات

يتم تشغيل cursor.execute مثال التعليمات البرمجية التالي مع عبارة SQL DELETE لحذف البيانات.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

أمر COPY للإيعاب السريع

يمكن أن ينتج عن أمر COPY معدل نقل هائل أثناء استيعاب البيانات في Azure Cosmos DB ل PostgreSQL. يمكن لأمر COPY استيعاب البيانات في الملفات، أو من خلال دفعات صغيرة من البيانات في الذاكرة لاستيعابها في الوقت الحقيقي.

الأمر COPY لتحميل بيانات من ملف

تنسخ التعليمات البرمجية التالية البيانات من ملف CSV إلى جدول قاعدة بيانات. تتطلب التعليمات البرمجية الملفpharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

أمر COPY لتحميل البيانات في الذاكرة

تنسخ التعليمات البرمجية التالية البيانات في الذاكرة إلى جدول.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

إعادة محاولة التطبيق لفشل طلب قاعدة البيانات

من الممكن أحياناً أن تفشل طلبات قاعدة البيانات من التطبيق. يمكن أن تحدث مثل هذه المشكلات في ظل سيناريوهات مختلفة، مثل فشل الشبكة بين التطبيق وقاعدة البيانات، وكلمة المرور غير الصحيحة، وما إلى ذلك. قد تكون بعض المشكلات عابرة، وتحل نفسها في بضع ثوان إلى دقائق. يمكنك تكوين منطق إعادة المحاولة في تطبيقك للتغلب على الأخطاء العابرة.

يساعد تكوين منطق إعادة المحاولة في تطبيقك على تحسين تجربة المستخدم. في ظل سيناريوهات الفشل، سينتظر المستخدمون وقتاً أطول قليلاً حتى يقوم التطبيق بخدمة الطلبات، بدلاً من مواجهة الأخطاء.

يوضح المثال أدناه كيفية تنفيذ منطق إعادة المحاولة في تطبيقك. يحاول نموذج القصاصة البرمجية طلب قاعدة بيانات كل 60 ثانية (حتى خمس مرات) حتى ينجح. يمكن تكوين عدد مرات إعادة المحاولة وتكرارها استناداً إلى احتياجات التطبيق.

في هذه التعليمة البرمجية، استبدل <نظام المجموعة> باسم نظام المجموعة وكلمة <المرور> بكلمة مرور المسؤول.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

الخطوات التالية