مقدمة حول Microsoft Spark Utilities

Microsoft Spark Utilities (MSSparkUtils) هي حزمة مضمنة لمساعدتك على أداء المهام الشائعة بسهولة. يمكنك استخدام MSSparkUtils للعمل مع أنظمة الملفات، والحصول على متغيرات البيئة، وتسلسل دفاتر الملاحظات معا، والعمل مع البيانات السرية. تتوفر MSSparkUtils في PySpark (Python)و Scala.NET Spark (C#)و ودفاتر R (Preview) الملاحظات وخطوط أنابيب Synapse.

المتطلبات الأساسية

قم بتكوين الوصول إلى Azure Data Lake Storage Gen2

تستخدم دفاتر ملاحظات Synapse المرور من Microsoft Entra للوصول إلى حسابات ADLS Gen2. يجب أن تكون Storage Blob Data Contributor للوصول إلى حساب (أو مجلد) ADLS Gen2.

تستخدم مسارات Synapse هوية الخدمة المدارة (MSI) لمساحة العمل للوصول إلى حسابات التخزين. لاستخدام MSSparkUtils في أنشطة البنية الأساسية لبرنامج ربط العمليات التجارية، يجب أن تكون هوية مساحة العمل الخاصة بك هي Storage Blob Data Contributor للوصول إلى حساب (أو مجلد) ADLS Gen2.

اتبع هذه الخطوات للتأكد من أن معرف Microsoft Entra ومساحة العمل MSI لديك لديه حق الوصول إلى حساب ADLS Gen2:

  1. افتح مدخل Microsoft Azure وحساب التخزين الذي تريد الوصول إليه. يمكنك الانتقال إلى الحاوية المحددة التي تريد الوصول إليها.

  2. حدد ارتباط التحكم في الوصول (IAM) في اللوحة اليمنى.

  3. حدد Add>Add role assignment لفتح صفحة إضافة تعيين الدور.

  4. تعيين الدور التالي. للحصول على خطوات تفصيلية، راجع تعيين أدوار Azure باستخدام مدخل Azure.

    الإعداد القيمة‬
    الدور المساهم في بيانات مخزن البيانات الثنائية الكبيرة
    تعيين الوصول إلى USER and MANAGEDIDENTITY
    الأعضاء حساب Microsoft Entra وهوية مساحة العمل

    إشعار

    اسم الهوية المدارة هو أيضاً اسم مساحة العمل.

    صفحة Add role assignment في مدخل Microsoft Azure.

  5. حدد حفظ.

يمكنك الوصول إلى البيانات على ADLS Gen2 باستخدام Synapse Spark عبر عنوان URL التالي:

abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>

تكوين الوصول إلى Azure Blob Storage

يستخدم Synapse توقيع الوصول المشترك (SAS) للوصول إلى Azure Blob Storage. لتجنب كشف مفاتيح SAS في التعليمات البرمجية، نوصي بإنشاء خدمة مرتبطة جديدة في مساحة عمل Synapse لحساب Azure Blob Storage الذي تريد الوصول إليه.

اتبع هذه الخطوات لإضافة خدمة مرتبطة جديدة لحساب Azure Blob Storage:

  1. افتح Azure Synapse Studio.
  2. حدد إدارة من اللوحة اليسرى وحدد الخدمات المرتبطة ضمن الاتصالات الخارجية.
  3. ابحث في Azure Blob Storage في لوحة الخدمة المرتبطة الجديدة على اليمين.
  4. حدد متابعة.
  5. حدد حساب تخزين Azure Blob للوصول إلى اسم الخدمة المرتبطة وتكوينه. اقترح استخدام مفتاح الحساب لأسلوب المصادقة.
  6. حدد اختبار الاتصال للتحقق من صحة الإعدادات.
  7. حدد إنشاء أولا وانقر فوق نشر الكل لحفظ التغييرات.

يمكنك الوصول إلى البيانات على Azure Blob Storage باستخدام Synapse Spark عبر عنوان URL التالي:

wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>

فيما يلي مثال على التعليمات البرمجية:

from pyspark.sql import SparkSession

# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely

wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name


val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)

var blob_account_name = "";  // replace with your blob name
var blob_container_name = "";     // replace with your container name
var blob_relative_path = "";  // replace with your relative folder path
var linked_service_name = "";    // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);

spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);

var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";

Console.WriteLine(wasbs_path);

# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name

blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)

print( paste('Remote blob path: ',wasb_path))

تكوين الوصول إلى Azure Key Vault

يمكنك إضافة Azure Key Vault كخدمة مرتبطة لإدارة بيانات الاعتماد الخاصة بك في Synapse. اتبع هذه الخطوات لإضافة Azure Key Vault كخدمة مرتبطة ب Synapse:

  1. افتح Azure Synapse Studio.

  2. حدد إدارة من اللوحة اليسرى وحدد الخدمات المرتبطة ضمن الاتصالات الخارجية.

  3. ابحث عن Azure Key Vault في لوحة الخدمة المرتبطة الجديدة على اليمين.

  4. حدد حساب Azure Key Vault للوصول إلى اسم الخدمة المرتبطة وتكوينه.

  5. حدد اختبار الاتصال للتحقق من صحة الإعدادات.

  6. حدد إنشاء أولا وانقر فوق نشر الكل لحفظ التغييرات.

تستخدم دفاتر ملاحظات Synapse المرور من Microsoft Entra للوصول إلى Azure Key Vault. تستخدم مسارات Synapse هوية مساحة العمل (MSI) للوصول إلى Azure Key Vault. للتأكد من أن التعليمات البرمجية تعمل في كل من دفتر الملاحظات وفي مسار Synapse، نوصي بمنح إذن الوصول السري لكل من حساب Microsoft Entra وهوية مساحة العمل.

اتبع هذه الخطوات لمنح الوصول السري إلى هوية مساحة العمل الخاصة بك:

  1. افتح مدخل Microsoft Azure وAzure Key Vault الذي تريد الوصول إليه.
  2. حدد نُهُج الوصول من القائمة اليسرى.
  3. حدد إضافة نهج الوصول:
    • اختر Key وSecret و Certificate Management كقالب تكوين.
    • حدد حساب Microsoft Entra وهوية مساحة العمل (مثل اسم مساحة العمل) في أساس التحديد أو تأكد من تعيينه بالفعل.
  4. حدد تحديد وإضافة.
  5. حدد زر حفظ لتنفيذ التغييرات.

أدوات نظام الملفات المساعدة

mssparkutils.fsيوفر أدوات مساعدة للعمل مع أنظمة ملفات مختلفة، بما في ذلك Azure Data Lake Storage Gen2 (ADLS Gen2) وAzure Blob Storage. تأكد من تكوين الوصول إلى Azure Data Lake Storage Gen2 وAzure Blob Storage بشكل مناسب.

قم بتشغيل الأوامر التالية للحصول على نظرة عامة على الأساليب المتوفرة:

from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()

النتائج:


mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory

Use mssparkutils.fs.help("methodName") for more info about a method.

قائمة الملفات

سرد محتوى دليل.

mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")

إظهار خصائص الملف

إرجاع خصائص الملف بما في ذلك اسم الملف ومسار الملف وحجم الملف ووقت تعديل الملف وما إذا كان دليلا وملفا.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
    file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
    Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
    writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}

إنشاء دليل جديد

يقوم بإنشاء الدليل المحدد إذا لم يكن موجودا وأي دلائل أصل ضرورية.

mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")

نسخ الملف

يقوم بنسخ ملف أو دليل. يدعم النسخ عبر أنظمة الملفات.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)

ملف نسخ مؤدي

يوفر هذا الأسلوب طريقة أسرع لنسخ الملفات أو نقلها، خاصة كميات كبيرة من البيانات.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively

إشعار

يدعم الأسلوب فقط في وقت تشغيل Azure Synapse ل Apache Spark 3.3 وAzure Synapse Runtime ل Apache Spark 3.4.

معاينة محتوى الملف

يقوم بإرجاع ما يصل إلى وحدات البايت الأولى 'maxBytes' للملف المحدد كسلسلة مشفرة في UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)

نقل الملف

يقوم بنقل ملف أو دليل. يدعم التنقل عبر أنظمة الملفات.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

كتابة ملف

يكتب السلسلة المحددة إلى ملف، مرمز في UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

إلحاق محتوى بملف

يلحق السلسلة المحددة إلى ملف، مرمز في UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

إشعار

mssparkutils.fs.append()mssparkutils.fs.put() ولا تدعم الكتابة المتزامنة إلى نفس الملف بسبب عدم وجود ضمانات ذرية.

إعادة تسمية الملف أو الدليل

يقوم بإزالة ملف أو دليل

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

الأدوات المساعدة لدفتر الملاحظات

‏‏غير مدعومة.

يمكنك استخدام الأدوات المساعدة لدفتر ملاحظات MSSparkUtils لتشغيل دفتر ملاحظات أو إنهاء دفتر ملاحظات بقيمة. قم بتشغيل الأوامر التالية للحصول على نظرة عامة على الأساليب المتوفرة:

mssparkutils.notebook.help()

الحصول على النتائج:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

إشعار

لا تنطبق الأدوات المساعدة لدفتر الملاحظات على تعريفات مهام Apache Spark (SJD).

الرجوع إلى دفتر ملاحظات

الرجوع إلى دفتر ملاحظات وإرجاع قيمة الخروج الخاصة به. يمكنك تشغيل استدعاءات دالة التداخل في دفتر ملاحظات بشكل تفاعلي أو في البنية الأساسية لبرنامج ربط العمليات التجارية. سيتم تشغيل دفتر الملاحظات المشار إليه على تجمع Spark الذي يستدعي دفتر الملاحظات هذه الدالة.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

على سبيل المثال:

mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })

بعد انتهاء التشغيل، سترى ارتباط لقطة يسمى "عرض تشغيل دفتر الملاحظات: اسم دفتر الملاحظات" الموضح في إخراج الخلية، يمكنك النقر فوق الارتباط لمشاهدة اللقطة لهذا التشغيل المحدد.

لقطة شاشة لرابط سريع python

تشغيل مرجع دفاتر ملاحظات متعددة بالتوازي

يسمح لك الأسلوب mssparkutils.notebook.runMultiple() بتشغيل دفاتر ملاحظات متعددة بالتوازي أو مع بنية طوبولوجية محددة مسبقا. تستخدم واجهة برمجة التطبيقات آلية تنفيذ متعددة مؤشرات الترابط داخل جلسة spark، ما يعني أن موارد الحوسبة تتم مشاركتها بواسطة تشغيل دفتر الملاحظات المرجعي.

باستخدام mssparkutils.notebook.runMultiple()، يمكنك:

  • قم بتنفيذ دفاتر ملاحظات متعددة في وقت واحد، دون انتظار انتهاء كل منها.

  • حدد تبعيات وترتيب التنفيذ لدفاتر الملاحظات، باستخدام تنسيق JSON بسيط.

  • تحسين استخدام موارد حساب Spark وتقليل تكلفة مشاريع Synapse الخاصة بك.

  • عرض لقطات كل سجل تشغيل دفتر ملاحظات في الإخراج، وتتبع أخطاء/مراقبة مهام دفتر الملاحظات بشكل ملائم.

  • احصل على قيمة الخروج لكل نشاط تنفيذي واستخدمها في مهام انتقال البيانات من الخادم.

يمكنك أيضا محاولة تشغيل mssparkutils.notebook.help("runMultiple") للعثور على المثال والاستخدام التفصيلي.

فيما يلي مثال بسيط لتشغيل قائمة دفاتر الملاحظات بالتوازي باستخدام هذا الأسلوب:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

نتيجة التنفيذ من دفتر الملاحظات الجذر كما يلي:

لقطة شاشة للإشارة إلى قائمة دفاتر الملاحظات.

فيما يلي مثال على تشغيل دفاتر الملاحظات ذات البنية الطوبولوجية باستخدام mssparkutils.notebook.runMultiple(). استخدم هذا الأسلوب لتنسيق دفاتر الملاحظات بسهولة من خلال تجربة التعليمات البرمجية.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG)

إشعار

الخروج من دفتر ملاحظات

يقوم بالخروج من دفتر ملاحظات بقيمة. يمكنك تشغيل استدعاءات دالة التداخل في دفتر ملاحظات بشكل تفاعلي أو في البنية الأساسية لبرنامج ربط العمليات التجارية.

  • عند استدعاء دالة exit() من دفتر ملاحظات بشكل تفاعلي، سيقوم Azure Synapse بطرح استثناء، وتخطي تشغيل خلايا التقسيم الفرعي، والحفاظ على جلسة Spark على قيد الحياة.

  • عند تنسيق دفتر ملاحظات يستدعي دالة exit() في مسار Synapse، سيرجع Azure Synapse قيمة إنهاء، ويكمل تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية، ويوقف جلسة Spark.

  • عند استدعاء دالة exit() في دفتر ملاحظات تتم الإشارة إليه، سيوقف Azure Synapse التنفيذ الإضافي في دفتر الملاحظات الذي تتم الإشارة إليه، ويستمر في تشغيل الخلايا التالية في دفتر الملاحظات الذي يستدعي الدالة run(). على سبيل المثال: يحتوي Notebook1 على ثلاث خلايا ويستدعي دالة exit() في الخلية الثانية. يحتوي Notebook2 على خمس خلايا ومكالمات run(notebook1) في الخلية الثالثة. عند تشغيل Notebook2، سيتم إيقاف Notebook1 في الخلية الثانية عند الضغط على الدالة exit(). سيستمر Notebook2 في تشغيل الخلية الرابعة والخلية الخامسة.

mssparkutils.notebook.exit("value string")

على سبيل المثال:

يحدد موقع دفتر ملاحظات Sample1 ضمن المجلد/ مع الخليتين التاليتين:

  • تحدد الخلية 1 معلمة إدخال مع تعيين القيمة الافتراضية إلى 10.
  • تخرج الخلية 2 من دفتر الملاحظات مع الإدخال كقيمة إنهاء.

لقطة شاشة من دفتر ملاحظات نموذجي

يمكنك تشغيل Sample1 في دفتر ملاحظات آخر بقيم افتراضية:


exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

النتائج:

Sample1 run success with input is 10

يمكنك تشغيل Sample1 في دفتر ملاحظات آخر وتعيين قيمة الإدخال على 20:

exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)

النتائج:

Sample1 run success with input is 20

يمكنك استخدام الأدوات المساعدة لدفتر ملاحظات MSSparkUtils لتشغيل دفتر ملاحظات أو إنهاء دفتر ملاحظات بقيمة. قم بتشغيل الأوامر التالية للحصول على نظرة عامة على الأساليب المتوفرة:

mssparkutils.notebook.help()

الحصول على النتائج:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

الرجوع إلى دفتر ملاحظات

الرجوع إلى دفتر ملاحظات وإرجاع قيمة الخروج الخاصة به. يمكنك تشغيل استدعاءات دالة التداخل في دفتر ملاحظات بشكل تفاعلي أو في البنية الأساسية لبرنامج ربط العمليات التجارية. سيتم تشغيل دفتر الملاحظات المشار إليه على تجمع Spark الذي يستدعي دفتر الملاحظات هذه الدالة.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

على سبيل المثال:

mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))

بعد انتهاء التشغيل، سترى ارتباط لقطة يسمى "عرض تشغيل دفتر الملاحظات: اسم دفتر الملاحظات" الموضح في إخراج الخلية، يمكنك النقر فوق الارتباط لمشاهدة اللقطة لهذا التشغيل المحدد.

لقطة شاشة لرابط سريع scala

الخروج من دفتر ملاحظات

يقوم بالخروج من دفتر ملاحظات بقيمة. يمكنك تشغيل استدعاءات دالة التداخل في دفتر ملاحظات بشكل تفاعلي أو في البنية الأساسية لبرنامج ربط العمليات التجارية.

  • عند استدعاء دالة exit() دفتر ملاحظات بشكل تفاعلي، سيقوم Azure Synapse بطرح استثناء، وتخطي تشغيل الخلايا الفرعية، والحفاظ على جلسة Spark قيد التشغيل.

  • عند تنسيق دفتر ملاحظات يستدعي دالة exit() في مسار Synapse، سيرجع Azure Synapse قيمة إنهاء، ويكمل تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية، ويوقف جلسة Spark.

  • عند استدعاء دالة exit() في دفتر ملاحظات تتم الإشارة إليه، سيوقف Azure Synapse التنفيذ الإضافي في دفتر الملاحظات الذي تتم الإشارة إليه، ويستمر في تشغيل الخلايا التالية في دفتر الملاحظات الذي يستدعي الدالة run(). على سبيل المثال: يحتوي Notebook1 على ثلاث خلايا ويستدعي دالة exit() في الخلية الثانية. يحتوي Notebook2 على خمس خلايا ومكالمات run(notebook1) في الخلية الثالثة. عند تشغيل Notebook2، سيتم إيقاف Notebook1 في الخلية الثانية عند الضغط على الدالة exit(). سيستمر Notebook2 في تشغيل الخلية الرابعة والخلية الخامسة.

mssparkutils.notebook.exit("value string")

على سبيل المثال:

يحدد موقع دفتر ملاحظات Sample1 ضمن mssparkutils/folder/ مع الخليتين التاليتين:

  • تحدد الخلية 1 معلمة إدخال مع تعيين القيمة الافتراضية إلى 10.
  • تخرج الخلية 2 من دفتر الملاحظات مع الإدخال كقيمة إنهاء.

لقطة شاشة من دفتر ملاحظات نموذجي

يمكنك تشغيل Sample1 في دفتر ملاحظات آخر بقيم افتراضية:


val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)

النتائج:

exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10

يمكنك تشغيل Sample1 في دفتر ملاحظات آخر وتعيين قيمة الإدخال على 20:

val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)

النتائج:

exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20

يمكنك استخدام الأدوات المساعدة لدفتر ملاحظات MSSparkUtils لتشغيل دفتر ملاحظات أو إنهاء دفتر ملاحظات بقيمة. قم بتشغيل الأوامر التالية للحصول على نظرة عامة على الأساليب المتوفرة:

mssparkutils.notebook.help()

الحصول على النتائج:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

الرجوع إلى دفتر ملاحظات

الرجوع إلى دفتر ملاحظات وإرجاع قيمة الخروج الخاصة به. يمكنك تشغيل استدعاءات دالة التداخل في دفتر ملاحظات بشكل تفاعلي أو في البنية الأساسية لبرنامج ربط العمليات التجارية. سيتم تشغيل دفتر الملاحظات المشار إليه على تجمع Spark الذي يستدعي دفتر الملاحظات هذه الدالة.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

على سبيل المثال:

mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))

بعد انتهاء التشغيل، سترى ارتباط لقطة يسمى "عرض تشغيل دفتر الملاحظات: اسم دفتر الملاحظات" الموضح في إخراج الخلية، يمكنك النقر فوق الارتباط لمشاهدة اللقطة لهذا التشغيل المحدد.

الخروج من دفتر ملاحظات

يقوم بالخروج من دفتر ملاحظات بقيمة. يمكنك تشغيل استدعاءات دالة التداخل في دفتر ملاحظات بشكل تفاعلي أو في البنية الأساسية لبرنامج ربط العمليات التجارية.

  • عند استدعاء دالة exit() دفتر ملاحظات بشكل تفاعلي، سيقوم Azure Synapse بطرح استثناء، وتخطي تشغيل الخلايا الفرعية، والحفاظ على جلسة Spark قيد التشغيل.

  • عند تنسيق دفتر ملاحظات يستدعي دالة exit() في مسار Synapse، سيرجع Azure Synapse قيمة إنهاء، ويكمل تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية، ويوقف جلسة Spark.

  • عند استدعاء دالة exit() في دفتر ملاحظات تتم الإشارة إليه، سيوقف Azure Synapse التنفيذ الإضافي في دفتر الملاحظات الذي تتم الإشارة إليه، ويستمر في تشغيل الخلايا التالية في دفتر الملاحظات الذي يستدعي الدالة run(). على سبيل المثال: يحتوي Notebook1 على ثلاث خلايا ويستدعي دالة exit() في الخلية الثانية. يحتوي Notebook2 على خمس خلايا ومكالمات run(notebook1) في الخلية الثالثة. عند تشغيل Notebook2، سيتم إيقاف Notebook1 في الخلية الثانية عند الضغط على الدالة exit(). سيستمر Notebook2 في تشغيل الخلية الرابعة والخلية الخامسة.

mssparkutils.notebook.exit("value string")

على سبيل المثال:

يحدد موقع دفتر ملاحظات Sample1 ضمن المجلد/ مع الخليتين التاليتين:

  • تحدد الخلية 1 معلمة إدخال مع تعيين القيمة الافتراضية إلى 10.
  • تخرج الخلية 2 من دفتر الملاحظات مع الإدخال كقيمة إنهاء.

لقطة شاشة من دفتر ملاحظات نموذجي

يمكنك تشغيل Sample1 في دفتر ملاحظات آخر بقيم افتراضية:


exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

النتائج:

Sample1 run success with input is 10

يمكنك تشغيل Sample1 في دفتر ملاحظات آخر وتعيين قيمة الإدخال على 20:

exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)

النتائج:

Sample1 run success with input is 20

أدوات بيانات الاعتماد المساعدة

يمكنك استخدام MSSparkUtils Credentials Utilities للحصول على رموز الوصول المميزة للخدمات المرتبطة وإدارة البيانات السرية في Azure Key Vault.

قم بتشغيل الأوامر التالية للحصول على نظرة عامة على الأساليب المتوفرة:

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()

الحصول على النتيجة:

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

إشعار

حاليا getSecretWithLS(linkedService, secret) غير مدعوم في C#‎.

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

الحصول على رمز مميز

إرجاع رمز Microsoft Entra المميز لجمهور معين، الاسم (اختياري). يسرد الجدول أدناه جميع أنواع الجماعات المستهدفة المتوفرة:

نوع الجماعة المستهدفة سلسلة حرفية لاستخدامها في استدعاء واجهة برمجة التطبيقات
تخزين Azure Storage
Azure Key Vault Vault
إدارة Azure AzureManagement
مستودع بيانات Azure SQL (مخصص وبلا خادم) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer ‏(Kusto) AzureDataExplorer
قاعدة بيانات Azure لـ MySQL AzureOSSDB
قاعدة بيانات Azure لـ MariaDB AzureOSSDB
قاعدة بيانات Azure لـ PostgreSQL AzureOSSDB
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')

التحقق من صحة الرمز المميز

إرجاع "true" إذا لم تنته صلاحية الرمز المميز.

mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')

الحصول على سلسلة الاتصال أو بيانات الاعتماد للخدمة المرتبطة

يقوم بإرجاع سلسلة الاتصال أو بيانات الاعتماد للخدمة المرتبطة

mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')

الحصول على البيانات السرية باستخدام هوية مساحة العمل

يقوم بإرجاع بيانات Azure Key Vault السرية لاسم azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام هوية مساحة العمل. التأكد من تكوين الوصول إلى Azure Key Vault بشكل مناسب.

mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')

الحصول على البيانات السرية باستخدام بيانات اعتماد المستخدم

يقوم بإرجاع بيانات Azure Key Vault السرية لاسم Azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام بيانات اعتماد المستخدم.

mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')

الحصول على البيانات السرية باستخدام هوية مساحة العمل

يقوم بوضع بيانات Azure Key Vault السرية لاسم azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام هوية مساحة العمل. التأكد من تكوين الوصول إلى Azure Key Vault بشكل مناسب.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

الحصول على البيانات السرية باستخدام هوية مساحة العمل

يقوم بوضع بيانات Azure Key Vault السرية لاسم azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام هوية مساحة العمل. التأكد من تكوين الوصول إلى Azure Key Vault بشكل مناسب.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")

الحصول على البيانات السرية باستخدام هوية مساحة العمل

يقوم بوضع بيانات Azure Key Vault السرية لاسم azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام هوية مساحة العمل. التأكد من تكوين الوصول إلى Azure Key Vault بشكل مناسب.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

وضع البيانات السرية باستخدام بيانات اعتماد المستخدم

يقوم بوضع بيانات Azure Key Vault السرية لاسم Azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام بيانات اعتماد المستخدم.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

وضع البيانات السرية باستخدام بيانات اعتماد المستخدم

يقوم بوضع بيانات Azure Key Vault السرية لاسم Azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام بيانات اعتماد المستخدم.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

وضع البيانات السرية باستخدام بيانات اعتماد المستخدم

يقوم بوضع بيانات Azure Key Vault السرية لاسم Azure Key Vault معين واسم سري واسم الخدمة المرتبطة باستخدام بيانات اعتماد المستخدم.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")

الأدوات المساعدة البيئية

قم بتشغيل الأوامر التالية للحصول على نظرة عامة على الأساليب المتوفرة:

mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()

الحصول على النتيجة:

getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id

الحصول على اسم المستخدم

يقوم بإرجاع اسم المستخدم الحالي.

mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()

الحصول على معرِّف المستخدم

يقوم بإرجاع معرِّف المستخدم الحالي.

mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()

الحصول على معرِّف الوظيفة

يقوم بإرجاع معرِّف الوظيفة.

mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()

الحصول على اسم مساحة العمل

يقم بإرجاع اسم مساحة العمل.

mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()

الحصول على اسم التجمع

يقم بإرجاع اسم تجمع Spark.

mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()

الحصول على معرف المجموعة

يقوم بإرجاع معرف نظام المجموعة الحالي.

mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()

سياق وقت التشغيل

تعرض 3 خصائص لوقت التشغيل Mssparkutils، يمكنك استخدام سياق وقت تشغيل mssparkutils للحصول على الخصائص المدرجة كما يلي:

  • Notebookname - اسم دفتر الملاحظات الحالي، سيرجع دائما قيمة لكل من الوضع التفاعلي ووضع البنية الأساسية لبرنامج ربط العمليات التجارية.
  • Pipelinejobid - معرف تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية، سيرجع القيمة في وضع البنية الأساسية لبرنامج ربط العمليات التجارية ويعيد سلسلة فارغة في الوضع التفاعلي.
  • Activityrunid - معرف تشغيل نشاط دفتر الملاحظات، سيرجع القيمة في وضع البنية الأساسية لبرنامج ربط العمليات التجارية ويعيد سلسلة فارغة في الوضع التفاعلي.

يدعم سياق وقت التشغيل حاليا كلا من Python وSc scala.

mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
    writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context

إدارة جلسات العمل

إيقاف جلسة عمل تفاعلية

بدلا من النقر يدويًا فوق زر الإيقاف، في بعض الأحيان يكون من الأنسب إيقاف جلسة تفاعلية عن طريق استدعاء واجهة برمجة التطبيقات في التعليمات البرمجية. لمثل هذه الحالات، نقدم واجهة برمجة تطبيقات mssparkutils.session.stop() لدعم إيقاف الجلسة التفاعلية عبر التعليمات البرمجية، وهي متاحة ل Scala وPython.

mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()

mssparkutils.session.stop() ستوقف واجهة برمجة التطبيقات جلسة العمل التفاعلية الحالية بشكل غير متزامن في الخلفية، وتوقف جلسة Spark وتحرر الموارد التي تشغلها الجلسة بحيث تكون متاحة للجلسات الأخرى في نفس التجمع.

إشعار

لا نوصي بواجهات برمجة التطبيقات المضمنة في لغة الاستدعاء مثل sys.exit Scala أو sys.exit() في Python في التعليمات البرمجية الخاصة بك، لأن واجهات برمجة التطبيقات هذه تقتل عملية المترجم الفوري فقط، وتترك جلسة Spark حية ولا يتم تحرير الموارد.

تبعيات الحزمة

إذا كنت ترغب في تطوير دفاتر الملاحظات أو المهام محليا وتحتاج إلى الرجوع إلى الحزم ذات الصلة للترجمة البرمجية/تلميحات IDE، يمكنك استخدام الحزم التالية.

الخطوات التالية