مشاركة عبر


استخدام Python للاتصال وتشغيل أوامر SQL على Azure Cosmos DB ل PostgreSQL

هام

لم يعد Azure Cosmos DB ل PostgreSQL مدعوما للمشاريع الجديدة. لا تستخدم هذه الخدمة لمشاريع جديدة. بدلا من ذلك، استخدم إحدى هاتين الخدمتين:

يوضح لك هذا التشغيل السريع كيفية استخدام تعليمة Python البرمجية للاتصال بمجموعة، واستخدام عبارات SQL لإنشاء جدول. ستقوم بعد ذلك بإدراج البيانات والاستعلام عنها وتحديثها وحذفها في قاعدة البيانات. تفترض الخطوات الواردة في هذه المقالة أنك على دراية بتطوير Python، وأنك جديد في العمل مع Azure Cosmos DB ل PostgreSQL.

تثبيت مكتبة PostgreSQL

تتطلب أمثلة التعليمات البرمجية في هذه المقالة مكتبة psycopg2 . ستحتاج إلى تثبيت psycopg2 مع مدير حزمة اللغة (مثل pip).

الاتصال وإنشاء جدول وإدراج البيانات

ينشئ مثال التعليمات البرمجية التالي تجمع اتصال بقاعدة بيانات Postgres. ثم يستخدم دوال cursor.execute مع عبارات SQL CREATE TABLE وINSERT INTO لإنشاء جدول وإدراج البيانات.

تلميح

يستخدم نموذج التعليمات البرمجية أدناه مجموعة اتصال لإنشاء اتصالات إلى PostgreSQL وإدارتها. يوصى بشدة بمجموعة الاتصال من جانب التطبيق لأن:

  • ويضمن أن التطبيق لا ينشئ اتصالات كثيرة جدًا بقاعدة البيانات، وبالتالي يتجنب تجاوز حدود الاتصال.
  • يمكن أن يساعد على تحسين الأداء بشكل كبير - كل من زمن الانتقال ومعدل النقل. يجب أن تتشعب عملية خادم PostgreSQL لمعالجة كل اتصال جديد، وأن تتجنب إعادة استخدام الاتصال هذا الحمل.

في التعليمات البرمجية التالية، استبدل <نظام المجموعة> باسم نظام المجموعة وكلمة <المرور> بكلمة مرور المسؤول أو الرمز المميز لمعرف Microsoft Entra.

إشعار

يغلق هذا المثال الاتصال في النهاية، لذلك إذا كنت تريد تشغيل العينات الأخرى في المقالة في نفس جلسة العمل، فلا تقم بتضمين # Clean up المقطع عند تشغيل هذا النموذج.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

عند نجاح تشغيل التعليمة البرمجية يُنتج المخرجات التالية:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

توزيع الجداول

يمنحك Azure Cosmos DB ل PostgreSQL القوة الفائقة لتوزيع الجداول عبر عقد متعددة لقابلية التوسع. يمكنك الأمر أدناه من توزيع جدول. يمكنك معرفة المزيد حول create_distributed_tableعمود التوزيع هنا.

إشعار

يتيح لهم توزيع الجداول النمو عبر أي عقد عاملة تمت إضافتها إلى نظام المجموعة.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

اقرأ البيانات

يستخدم مثال التعليمات البرمجية التالي واجهات برمجة التطبيقات التالية لقراءة البيانات من قاعدة البيانات:

  • cursor.execute مع عبارة SQL SELECT لقراءة البيانات.
  • cursor.fetchall() لقبول استعلام وإرجاع مجموعة نتائج إلى التكرار.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

تحديث البيانات

يستخدم cursor.execute مثال التعليمات البرمجية التالي مع عبارة SQL UPDATE لتحديث البيانات.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

حذف البيانات

يتم تشغيل cursor.execute مثال التعليمات البرمجية التالي مع عبارة SQL DELETE لحذف البيانات.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

أمر COPY للاستيعاب السريع

يمكن أن ينتج عن أمر COPY إنتاجية هائلة أثناء استيعاب البيانات في Azure Cosmos DB ل PostgreSQL. يمكن لأمر COPY استيعاب البيانات في الملفات، أو من دفعات صغيرة من البيانات في الذاكرة لاستيعابها في الوقت الحقيقي.

الأمر COPY لتحميل البيانات من ملف

تنسخ التعليمات البرمجية التالية البيانات من ملف CSV إلى جدول قاعدة بيانات. تتطلب التعليمات البرمجية pharmacies.csv الملف.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

الأمر COPY لتحميل البيانات في الذاكرة

تنسخ التعليمات البرمجية التالية البيانات الموجودة في الذاكرة إلى جدول.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

إعادة محاولة التطبيق لحالات فشل طلب قاعدة البيانات

من الممكن أحياناً أن تفشل طلبات قاعدة البيانات من التطبيق. يمكن أن تحدث مثل هذه المشكلات في ظل سيناريوهات مختلفة، مثل فشل الشبكة بين التطبيق وقاعدة البيانات، وكلمة المرور غير الصحيحة، وما إلى ذلك. قد تكون بعض المشكلات عابرة، وتحل نفسها في بضع ثوان إلى دقائق. يمكنك تكوين منطق إعادة المحاولة في تطبيقك للتغلب على الأخطاء العابرة.

يساعد تكوين منطق إعادة المحاولة في تطبيقك على تحسين تجربة المستخدم. في ظل سيناريوهات الفشل، سينتظر المستخدمون وقتاً أطول قليلاً حتى يقوم التطبيق بخدمة الطلبات، بدلاً من مواجهة الأخطاء.

يوضح المثال أدناه كيفية تنفيذ منطق إعادة المحاولة في تطبيقك. يحاول نموذج القصاصة البرمجية طلب قاعدة بيانات كل 60 ثانية (حتى خمس مرات) حتى ينجح. يمكن تكوين عدد مرات إعادة المحاولة وتكرارها استناداً إلى احتياجات التطبيق.

في هذه التعليمة البرمجية، استبدل <نظام المجموعة> باسم نظام المجموعة وكلمة <المرور بكلمة مرور> المسؤول.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

الخطوات التالية