اقرأ باللغة الإنجليزية

مشاركة عبر


Azure Synapse Dedicated SQL Pool Connector for Apache Spark

مقدمة

تتيح Azure Synapse Dedicated SQL Pool Connector for Apache في Azure Synapse Analytics النقل الفعال لمجموعات البيانات الكبيرة بين وقت تشغيل Apache Spark وتجمع Dedicated SQL. يتم شحن الموصل كمكتبة افتراضية مع Azure Synapse Workspace. يتم تنفيذ الموصل باستخدام اللغة Scala. يدعم الموصل Scala وPython. لتستخدم Connector مع خيارات لغة دفتر الملاحظات الأخرى، استخدم الأمر Spark magic - %%spark.

على مستوى عالٍ، يوفّر الموصل الإمكانات التالية:

  • اقرأ من Azure Synapse Dedicated SQL Pool:
    • اقرأ مجموعات البيانات الكبيرة من جداول تجمع SQL المخصصة ل Synapse (الداخلية والخارجية) وطرق العرض.
    • دعم شامل لدفع دالة التقييم لأسفل، حيث تُعيّن عوامل التصفية على DataFrame إلى دفع دالة تقييم SQL المقابلة لأسفل.
    • الدعم لتنقيح الأعمدة.
    • دعم دفع الاستعلام لأسفل.
  • الكتابة إلى تجمع SQL المخصص ل Azure Synapse:
    • استيعاب البيانات كبيرة الحجم إلى أنواع الجداول الداخلية والخارجية.
    • يدعم تفضيلات وضع حفظ DataFrame التالية:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • يدعم نوع الكتابة إلى الجدول الخارجي تنسيق الملف Parquet وDelimited Text (مثال - CSV).
    • لكتابة البيانات إلى الجداول الداخلية، يستخدم الموصل حاليًا عبارة COPY بدلاً من نهج CETAS/CTAS.
    • تقديم تحسينات لتحسين أداء معدل نقل الكتابة من طرف إلى طرف.
    • تقديم مؤشر اختياري لمعاودة الاتصال (وسيطة دالة Scala) الذي يمكن للعملاء استخدامه لتلقي قياسات ما بعد الكتابة.
      • تتضمن بعض الأمثلة - عدد السجلات والمدة اللازمة لاستكمال إجراء معين وسبب الإخفاق.

نهج التزامن

قراءة

رسم تخطيطي عالي المستوى لتدفق البيانات لوصف تزامن الموصل لطلب القراءة.

كتابة

رسم تخطيطي عالي المستوى لتدفق البيانات لوصف تزامن الموصل لطلب الكتابة.

المتطلبات الأساسية

تتم مناقشة المتطلبات الأساسية مثل إعداد موارد Azure المطلوبة وخطوات تكوينها في هذا القسم.

موارد Azure

مراجعة موارد Azure التابعة التالية وإعدادها:

إعداد قاعدة البيانات

اتصل بقاعدة بيانات Synapse Dedicated SQL Pool وشغل عبارات الإعداد التالية:

  • إنشاء مستخدم قاعدة بيانات تم تعيينه إلى هوية مستخدم Microsoft Entra المستخدمة لتسجيل الدخول إلى مساحة عمل Azure Synapse.

    SQL
    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • أنشئ مخططًا يتم فيه تعريف الجداول، بحيث يمكن لـ Connector الكتابة إلى الجداول المعنية والقراءة منها بنجاح.

    SQL
    CREATE SCHEMA [<schema_name>];
    

المصادقة

المصادقة المستندة إلى معرف Microsoft Entra

المصادقة المستندة إلى معرف Microsoft Entra هي نهج مصادقة متكامل. يطلب من المستخدم تسجيل الدخول بنجاح إلى مساحة عمل Azure Synapse Analytics.

المصادقة الأساسية

يطلب نهج المصادقة الأساسي من المستخدم تكوين خيارات username وpassword. راجع القسم - خيارات التكوين للتعرف على معلمات التكوين ذات الصلة للقراءة من الجداول والكتابة إليها في Azure Synapse Dedicated SQL Pool.

التصريح

توجد طريقتان لمنح أذونات الوصول إلى Azure Data Lake Storage Gen2 - حساب التخزين:

  • دور التحكم في الوصول استنادًا إلى الدور - دور Storage Blob Data Contributor
    • Storage Blob Data Contributor Role يمنح تعيين أذونات المستخدم للقراءة والكتابة والحذف من حاويات Azure Storage Blob.
    • يوفر التحكم في الوصول استنادًا إلى الدور نهج التحكم الرديء على مستوى الحاوية.
  • قوائم التحكم بالوصول (ACL)
    • يسمح نهج ACL بعناصر تحكم دقيقة على مسارات و/أو ملفات محددة ضمن مجلد معين.
    • لا تُفرض عمليات التحقق من ACL إذا منح المستخدم بالفعل أذونات باستخدام نهج التحكم في الوصول استنادًا إلى الدور.
    • هناك نوعان واسعان من أذونات ACL:
      • أذونات الوصول (المطبقة على مستوى أو عنصر محدد).
      • الأذونات الافتراضية (المطبقة تلقائيًا على كافة العناصر التابعة في وقت إنشائها).
    • يتضمن نوع الأذونات ما يلي:
      • Execute يُمكّن القدرة على اجتياز التسلسلات الهرمية للمجلد أو التنقل فيها.
      • Read يُمكّن القدرة على القراءة.
      • Write يُمكّن القدرة على الكتابة.
    • من الضروري تكوين ACL بحيث يمكن لـ Connector الكتابة والقراءة بنجاح من مواقع التخزين.

ملاحظة

  • في حال كنت ترغب في تشغيل دفاتر الملاحظات باستخدام مسارات Synapse Workspace، يجب عليك أيضًا منح أذونات الوصول المذكورة أعلاه إلى الهوية المُدارة الافتراضية لـ Synapse Workspace. يكون الاسم الافتراضي للهوية المُدارة لمساحة العمل هو نفس اسم مساحة العمل.

  • لاستخدام مساحة عمل Synapse باستخدام حسابات التخزين الآمنة، يجب تكوين نقطة نهاية خاصة مُدارة من دفتر الملاحظات. يجب الموافقة على نقطة النهاية الخاصة المُدارة من القسم Private endpoint connections لحساب التخزين ADLS Gen2 في الجزء Networking.

لتمكين التفاعل الناجح مع Azure Synapse Dedicated SQL Pool، يُعد التخويل التالي ضروريًا ما لم تكن مستخدمًا مكونًا أيضًا كـ Active Directory Admin على نقطة نهاية Dedicated SQL:

  • السيناريو الخاص بالقراءة

    • امنح المستخدم db_exporter باستخدام الإجراء المخزن للنظام sp_addrolemember.

      SQL
      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • السيناريو الخاص بالكتابة

    • يستخدم Connector الأمر COPY لكتابة البيانات من عملية التقسيم المرحلي إلى الموقع المُدار للجدول الداخلي.
      • كوّن الأذونات المطلوبة الموضحة هنا.

      • يرد فيما يلي قصاصة برمجية للوصول السريع من نفس:

        SQL
        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

مستندات API

Azure Synapse Dedicated SQL Pool Connector for Apache Spark - وثائق API.

خيارات الإعداد

لتشغيل عملية القراءة أو الكتابة وتنسيقها بشكل ناجح، يتوقع Connector معلمات تكوين معينة. تعريف العنصر - com.microsoft.spark.sqlanalytics.utils.Constants يوفر قائمة بالثوابت الموحدة لكل مفتاح معلمة.

ترد فيما يلي قائمة خيارات التكوين استنادًا إلى سيناريو الاستخدام:

  • القراءة باستخدام المصادقة المستندة إلى معرف Microsoft Entra
    • يتم تعيين بيانات الاعتماد تلقائيا، ولا يطلب من المستخدم توفير خيارات تكوين محددة.
    • وسيطة اسم الجدول المكونة من ثلاثة أجزاء على الأسلوب synapsesql مطلوبة للقراءة من الجدول المعنيّ في Azure Synapse Dedicated SQL Pool.
  • القراءة باستخدام المصادقة الأساسية
    • نقطة نهاية SQL المخصصة ل Azure Synapse
      • Constants.SERVER - نقطة نهاية Synapse Dedicated SQL Pool (اسم المجال المؤهل بالكامل للخادم)
      • Constants.USER - اسم مستخدم SQL.
      • Constants.PASSWORD - كلمة مرور مستخدم SQL.
    • نقطة نهاية Azure Data Lake Storage (Gen 2) - مجلدات التقسيم المرحلي
      • Constants.DATA_SOURCE - يُستخدم مسار التخزين الذي تم تعيينه على معلمة موقع مصدر البيانات للتقسيم المرحلي للبيانات.
  • الكتابة باستخدام المصادقة المستندة إلى معرف Microsoft Entra
    • نقطة نهاية SQL المخصصة ل Azure Synapse
      • بشكل افتراضي، يستنتج Connector نقطة نهاية Synapse Dedicated SQL باستخدام اسم قاعدة البيانات الذي تم تعيينه على معلمة اسم الجدول المكونة من ثلاثة أجزاء للأسلوب synapsesql.
      • بدلاً من ذلك، يمكن للمستخدمين استخدام خيار Constants.SERVER لتحديد نقطة نهاية sql. تأكد من أن نقطة النهاية تستضيف قاعدة البيانات المقابلة مع المخطط المعنيّ.
    • نقطة نهاية Azure Data Lake Storage (Gen 2) - مجلدات التقسيم المرحلي
      • لنوع الجدول الداخلي:
        • كوّن إما خيار Constants.TEMP_FOLDER أو Constants.DATA_SOURCE.
        • إذا اختار المستخدم توفير الخيار Constants.DATA_SOURCE، فسيتم اشتقاق مجلد عملية التقسيم المرحلي باستخدام قيمة location من DataSource.
        • في حال توفير كليهما، فسيتم استخدام قيمة الخيار Constants.TEMP_FOLDER.
        • في حالة عدم وجود خيار مجلد عملية التقسيم المرحلي، سيشتق Connector واحدًا استنادًا إلى تكوين وقت التشغيل - spark.sqlanalyticsconnector.stagingdir.prefix.
      • لنوع الجدول الخارجي:
        • Constants.DATA_SOURCE هو خيار مطلوب للتكوين.
        • يستخدم الموصل مسار التخزين الذي تم تعيينه على معلمة موقع مصدر البيانات بالاشتراك مع الوسيطة location إلى الأسلوب synapsesql ويشتق المسار المطلق لاستمرار بيانات الجدول الخارجي.
        • إذا لم تحدد الوسيطة location إلى الأسلوب synapsesql، فسيشتق الموصل قيمة الموقع كـ <base_path>/dbName/schemaName/tableName.
  • الكتابة باستخدام المصادقة الأساسية
    • نقطة نهاية SQL المخصصة ل Azure Synapse
      • Constants.SERVER - - نقطة نهاية Synapse Dedicated SQL Pool (اسم المجال المؤهل بالكامل للخادم).
      • Constants.USER - اسم مستخدم SQL.
      • Constants.PASSWORD - كلمة مرور مستخدم SQL.
      • Constants.STAGING_STORAGE_ACCOUNT_KEY المقترن بحساب التخزين الذي يستضيف Constants.TEMP_FOLDERS (أنواع الجداول الداخلية فقط) أو Constants.DATA_SOURCE.
    • نقطة نهاية Azure Data Lake Storage (Gen 2) - مجلدات التقسيم المرحلي
      • لا تنطبق بيانات اعتماد المصادقة الأساسية لـ SQL على الوصول إلى نقاط نهاية التخزين.
      • ومن ثمَّ، تأكد من تعيين أذونات الوصول إلى التخزين ذات الصلة كما هو موضح في القسم Azure Data Lake Storage Gen2.

قوالب التعليمة البرمجية

يعرض هذا القسم قوالب التعليمات البرمجية المرجعية لوصف كيفية استخدام واستدعاء Azure Synapse Dedicated SQL Pool Connector for Apache Spark.

ملاحظة

استخدام Connector في Python-

  • يُدعم الموصل في Python لـ Spark 3 فقط. بالنسبة إلى Spark 2.4 (غير مدعوم)، يمكننا استخدام واجهة برمجة تطبيقات موصل Scala للتفاعل مع المحتوى من DataFrame في PySpark باستخدام DataFrame.createOrReplaceTempView أو DataFrame.createOrReplaceGlobalTempView. راجع القسم - استخدام البيانات المُجسدة عبر الخلايا.
  • مؤشر معاودة الاتصال غير متوفر في Python.

القراءة من Azure Synapse Dedicated SQL Pool

طلب القراءة - توقيع الأسلوب synapsesql

Scala
synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

القراءة من جدول باستخدام المصادقة المستندة إلى معرف Microsoft Entra

Scala
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

القراءة من استعلام باستخدام المصادقة المستندة إلى معرف Microsoft Entra

ملاحظة

القيود أثناء القراءة من الاستعلام:

  • يتعذر تحديد اسم الجدول والاستعلام في الوقت نفسه.
  • يسمح باستعلامات التحديد فقط. لا يسمح ب DDL وDML SQLs.
  • لا يتم دفع خيارات التحديد والتصفية على إطار البيانات لأسفل إلى تجمع SQL المخصص عند تحديد استعلام.
  • تتوفر القراءة من استعلام فقط في Spark 3.
Scala
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

القراءة من جدول باستخدام المصادقة الأساسية

Scala
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

القراءة من استعلام باستخدام المصادقة الأساسية

Scala
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

الكتابة إلى Azure Synapse Dedicated SQL Pool

طلب الكتابة - توقيع الأسلوب synapsesql

Scala
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

الكتابة باستخدام المصادقة المستندة إلى معرف Microsoft Entra

يرد فيما يلي قالب تعليمات برمجية شامل يصف كيفية استخدام Connector لسيناريوهات الكتابة:

Scala
//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

الكتابة باستخدام المصادقة الأساسية

يستبدل مقتطف التعليمات البرمجية التالي تعريف الكتابة الموضح في قسم الكتابة باستخدام المصادقة المستندة إلى معرف Microsoft Entra، لإرسال طلب الكتابة باستخدام نهج المصادقة الأساسية SQL:

Scala
//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

في نهج المصادقة الأساسية، لغرض قراءة البيانات من مسار تخزين المصدر، هناك حاجة إلى خيارات تكوين أخرى. توفر القصاصة البرمجية التالية مثالاً للقراءة من مصدر بيانات Azure Data Lake Storage Gen2 باستخدام بيانات اعتماد كيان الخدمة:

Scala
//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

أوضاع حفظ DataFrame المدعومة

تُدعم أوضاع الحفظ التالية عند كتابة بيانات المصدر إلى جدول وجهة في Azure Synapse Dedicated SQL Pool:

  • ErrorIfExists (وضع الحفظ الافتراضي)
    • إذا كان الجدول الوجهة موجودًا، فسيتم تعطيل الكتابة مع إرجاع استثناء إلى المُتّصل به. وإلا، يتم إنشاء جدول جديد ببيانات من مجلدات عملية التقسيم المرحلي.
  • تجاهل
    • إذا كان الجدول الوجهة موجودًا، فستتجاهل الكتابة طلب الكتابة دون إرجاع خطأ. وإلا، يتم إنشاء جدول جديد ببيانات من مجلدات عملية التقسيم المرحلي.
  • الكتابه
    • إذا كان الجدول الوجهة موجودًا، فسيتم استبدال البيانات الموجودة في الوجهة ببيانات من مجلدات عملية التقسيم المرحلي. وإلا، يتم إنشاء جدول جديد ببيانات من مجلدات عملية التقسيم المرحلي.
  • ألحق
    • إذا كان الجدول الوجهة موجودًا، فسيتم إلحاق البيانات الجديدة به. وإلا، يتم إنشاء جدول جديد ببيانات من مجلدات عملية التقسيم المرحلي.

مؤشر معاودة الاتصال لطلب الكتابة

قدمت تغييرات API لمسار الكتابة الجديد ميزة تجريبية لتزويد العميل بخريطة قيمة->المفتاح لقياسات ما بعد الكتابة. يتم تعريف مفاتيح القياسات في تعريف العنصر الجديد - Constants.FeedbackConstants. يمكن استرداد القياسات كسلسلة JSON عن طريق التمرير في مؤشر معاودة الاتصال (Scala Function). يرد فيما يلي توقيع الدالة:

Scala
//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

يرد فيما يلي بعض القياسات البارزة (المقدمة بطريقة camel case):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

يرد فيما يلي نموذج لسلسلة JSON مع قياسات ما بعد الكتابة:

doc
{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

المزيد من نماذج التعليمات البرمجية

استخدام البيانات المُجسدة عبر الخلايا

يمكن استخدام createOrReplaceTempView لـ Spark DataFrame للوصول إلى البيانات التي تم إحضارها إلى خلية أخرى، عن طريق تسجيل طريقة عرض مؤقتة.

  • الخلية التي يتم إحضار البيانات إليها (على سبيل المثال مع تفضيل لغة دفتر الملاحظات كـ Scala)
Scala
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • الآن، غيّر تفضيل اللغة على دفتر الملاحظات إلى PySpark (Python) وأحضر البيانات من طريقة العرض المسجلة <temporary_view_name>
Python
        spark.sql("select * from <temporary_view_name>").show()

معالجة الاستجابة

يحتوي استدعاء synapsesql على حالتي نهاية محتملتين - حالة نجاح أو حالة فشل. يصف هذا القسم كيفية معالجة استجابة الطلب كل سيناريو.

استجابة طلب القراءة

عند الانتهاء، تُعرض قصاصة استجابة القراءة في إخراج الخلية. كما سيؤدي الفشل في الخلية الحالية إلى إلغاء عمليات تنفيذ الخلايا اللاحقة. تتوفر معلومات تفصيلية عن الخطأ في Spark Application Logs.

استجابة طلب الكتابة

بشكل افتراضي، تتم طباعة استجابة الكتابة إلى إخراج الخلية. عند الفشل، توضع علامة على الخلية الحالية على أنها في حالة فشل، وسيتم تعطيل عمليات تنفيذ الخلايا اللاحقة. النهج الآخر هو تمرير خيار مؤشر معاودة الاتصال إلى الأسلوب synapsesql. سيوفر مؤشر معاودة الاتصال وصولاً برمجيًا إلى استجابة الكتابة.

اعتبارات أخرى

  • عند القراءة من جداول Azure Synapse Dedicated SQL Pool:
    • ضع في اعتبارك تطبيق عوامل التصفية الضرورية على DataFrame للاستفادة من ميزة تنقيح الأعمدة في Connector.
    • لا يدعم سيناريو القراءة العبارة TOP(n-rows)، عند تأطير عبارات الاستعلام SELECT. اختيار الحد من البيانات هو استخدام عبارة limit(.) لـ DataFrame.
  • عند الكتابة إلى جداول تجمع Azure Synapse Dedicated SQL:
    • بالنسبة إلى أنواع الجداول الداخلية:
      • يتم إنشاء الجداول مع توزيع البيانات بطريقة ROUND_ROBIN.
      • يتم استنتاج أنواع الأعمدة من DataFrame التي من شأنها قراءة البيانات من المصدر. تُعيّن أعمدة السلسلة إلى NVARCHAR(4000).
    • بالنسبة إلى أنواع الجداول الخارجية:
      • يشتق التوازي الأوليّ لـ DataFrame تنظيم البيانات للجدول الخارجي.
      • يتم استنتاج أنواع الأعمدة من DataFrame التي من شأنها قراءة البيانات من المصدر.
    • يمكن تحقيق توزيع أفضل للبيانات عبر المنفذين عن طريق ضبط spark.sql.files.maxPartitionBytes ومعلمة repartition لـ DataFrame.
    • عند كتابة مجموعات بيانات كبيرة، من المهم مراعاة تأثير إعداد مستوى أداء DWU الذي يحدّ من حجم العملية.
  • راقب اتجاهات استخدام Azure Data Lake Storage Gen2 لاكتشاف سلوكيات التقييد التي يمكن أن تؤثر على أداء القراءة والكتابة.