إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
في هذا البرنامج التعليمي، يمكنك استخدام موصل Azure Cosmos DB Spark لقراءة البيانات أو كتابتها من حساب Azure Cosmos DB لحساب NoSQL. يستخدم هذا البرنامج التعليمي Azure Databricks ودفتر ملاحظات Jupyter لتوضيح كيفية التكامل مع واجهة برمجة التطبيقات ل NoSQL من Spark. يركز هذا البرنامج التعليمي على Python وSc scala، على الرغم من أنه يمكنك استخدام أي لغة أو واجهة يدعمها Spark.
في هذا البرنامج التعليمي، تتعلم كيفية:
- الاتصال بواجهة برمجة تطبيقات لحساب NoSQL باستخدام Spark ودفتر ملاحظات Jupyter.
- إنشاء موارد حاويات وقواعد بيانات.
- استيعاب البيانات إلى الحاوية.
- الاستعلام عن البيانات في الحاوية.
- تنفيذ العمليات الشائعة على العناصر الموجودة في الحاوية.
المتطلبات الأساسية
- حساب Azure Cosmos DB ل NoSQL موجود.
- إذا كان لديك اشتراك Azure موجود، فبادر بإنشاء حساب جديد.
- مساحة عمل Azure Databricks موجودة.
الاتصال باستخدام Spark وJupyter
استخدم مساحة عمل Azure Databricks الحالية لإنشاء مجموعة حوسبة جاهزة لاستخدام Apache Spark 3.4.x للاتصال بحساب Azure Cosmos DB الخاص بك ل NoSQL.
افتح مساحة عمل Azure Databricks.
في واجهة مساحة العمل، قم بإنشاء نظام مجموعة جديد. قم بتكوين نظام المجموعة باستخدام هذه الإعدادات، كحد أدنى:
Version قيمه إصدار وقت التشغيل 13.3 LTS (Scala 2.12، Spark 3.4.1) استخدم واجهة مساحة العمل للبحث عن حزم Maven من Maven Central باستخدام معرف المجموعة .
com.azure.cosmos.sparkقم بتثبيت الحزمة خصيصا ل Spark 3.4 مع معرف قطعة أثرية مسبوقةazure-cosmos-spark_3-4بنظام المجموعة.أخيرا ، قم بإنشاء دفتر ملاحظات جديد.
تلميح
بشكل افتراضي، يتم إرفاق دفتر الملاحظات بالمجموعة التي تم إنشاؤها مؤخرا.
ضمن دفتر الملاحظات، قم بتعيين إعدادات تكوين معالجة المعاملات عبر الإنترنت (OLTP) لنقطة نهاية حساب NoSQL واسم قاعدة البيانات واسم الحاوية.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
إنشاء قاعدة بيانات وحاوية
استخدم واجهة برمجة تطبيقات الكتالوج لإدارة موارد الحساب مثل قواعد البيانات والحاويات. بعد ذلك، يمكنك استخدام OLTP لإدارة البيانات داخل موارد الحاوية.
تكوين واجهة برمجة تطبيقات الكتالوج لإدارة واجهة برمجة التطبيقات لموارد NoSQL باستخدام Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))إنشاء قاعدة بيانات جديدة باسم
cosmicworksباستخدامCREATE DATABASE IF NOT EXISTS.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")إنشاء حاوية جديدة باسم
productsباستخدامCREATE TABLE IF NOT EXISTS. تأكد من تعيين مسار مفتاح القسم إلى/categoryوتمكين معدل نقل التحجيم التلقائي مع الحد الأقصى لمعدل نقل1000وحدات الطلب (RUs) في الثانية.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))إنشاء حاوية أخرى باسم
employeesباستخدام تكوين مفتاح قسم هرمي. استخدم/organizationو/departmentو/teamكمسارات مفتاح القسم. اتبع هذا الترتيب المحدد. أيضا ، قم بتعيين معدل النقل إلى عدد يدوي من400وحدات النقل.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))قم بتشغيل خلايا دفتر الملاحظات للتحقق من إنشاء قاعدة البيانات والحاويات داخل واجهة برمجة التطبيقات لحساب NoSQL.
استيعاب البيانات
إنشاء عينة مجموعة بيانات. ثم استخدم OLTP لاستيعاب تلك البيانات إلى واجهة برمجة التطبيقات لحاوية NoSQL.
إنشاء عينة مجموعة بيانات.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )استخدم
spark.createDataFrameوتكوين OLTP المحفوظ مسبقا لإضافة بيانات نموذجية إلى الحاوية الهدف.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
بيانات الاستعلام
تحميل بيانات OLTP في إطار بيانات لإجراء استعلامات شائعة على البيانات. يمكنك استخدام بناء الجمل المختلفة لتصفية البيانات أو الاستعلام عن البيانات.
استخدم
spark.readلتحميل بيانات OLTP في كائن إطار بيانات. استخدم نفس التكوين الذي استخدمته سابقا في هذا البرنامج التعليمي. أيضا، قم بتعيينspark.cosmos.read.inferSchema.enabledإلىtrueللسماح لموصل Spark باستنتاج المخطط عن طريق أخذ عينات من العناصر الموجودة.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()عرض مخطط البيانات المحملة في إطار البيانات باستخدام
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()عرض صفوف البيانات حيث
quantityيكون العمود أقل من20.whereاستخدم الدالتين وshowلتنفيذ هذا الاستعلام.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()عرض صف البيانات الأول حيث
clearanceيكونtrueالعمود . استخدم الدالةfilterلتنفيذ هذا الاستعلام.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)عرض خمسة صفوف من البيانات بدون عامل تصفية أو اقتطاع. استخدم الدالة
showلتخصيص مظهر وعدد الصفوف التي يتم عرضها.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)استعلم عن بياناتك باستخدام سلسلة استعلام NoSQL الأولية هذه:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
تنفيذ العمليات الشائعة
عند العمل مع واجهة برمجة التطبيقات لبيانات NoSQL في Spark، يمكنك إجراء تحديثات جزئية أو العمل مع البيانات ك JSON أولي.
لإجراء تحديث جزئي لعنصر:
انسخ متغير التكوين الموجود
configوعدل الخصائص في النسخة الجديدة. على وجه التحديد، قم بتكوين استراتيجية الكتابة إلىItemPatch. ثم قم بتعطيل الدعم المجمع. تعيين الأعمدة والعمليات المعينة. وأخيرا، قم بتعيين نوع العملية الافتراضية إلىSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )إنشاء متغيرات لمفتاح قسم العنصر والمعرف الفريد الذي تنوي استهدافه كجزء من عملية التصحيح هذه.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"إنشاء مجموعة من كائنات التصحيح لتحديد العنصر الهدف وتحديد الحقول التي يجب تعديلها.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )إنشاء إطار بيانات باستخدام مجموعة من كائنات التصحيح. استخدم
writeلتنفيذ عملية التصحيح.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()قم بتشغيل استعلام لمراجعة نتائج عملية التصحيح. يجب الآن تسمية
Yamba New Surfboardالعنصر دون أي تغييرات أخرى.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
للعمل مع بيانات JSON الأولية:
انسخ متغير التكوين الموجود
configوعدل الخصائص في النسخة الجديدة. على وجه التحديد، قم بتغيير الحاوية الهدف إلىemployees. ثم قم بتكوينcontactsالعمود/الحقل لاستخدام بيانات JSON الأولية.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )إنشاء مجموعة من الموظفين لاستيعابها في الحاوية.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )إنشاء إطار بيانات واستخدامه
writeلاستيعاب بيانات الموظف.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()عرض البيانات من إطار البيانات باستخدام
show. لاحظ أنcontactsالعمود هو JSON الخام في الإخراج.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
المحتويات ذات الصلة
- أباتشي سبارك
- واجهة برمجة تطبيقات كتالوج Azure Cosmos DB
- مرجع معلمة التكوين
- نماذج موصل Azure Cosmos DB Spark
- الترحيل من Spark 2.4 إلى Spark 3.*
- الإصدارات المهملة:
- تم إهمال Azure Cosmos DB Spark Connector ل Spark 3.1 و3.2، لأنه لا توجد أوقات تشغيل Spark 3.1 أو 3.2 مدعومة في Azure Databricks أو Azure Synapse أو Azure HDInsight متوفرة بعد الآن.
- دليل الترحيل للتحديث من Spark 3.1
- دليل الترحيل للتحديث من Spark 3.2
- توافق الإصدار:
- ملاحظات الإصدار:
- روابط التنزيل: