Partager via


Connecteur de pools SQL dédiés Azure Synapse pour Apache Spark

Introduction

Le Connecteur de pools SQL dédié Azure Synapse pour Apache Spark dans Azure Synapse Analytics permet de transférer efficacement des jeux de données volumineux entre le runtime Apache Spark et le pool SQL dédié. Le connecteur est fourni en tant que bibliothèque par défaut avec l’espace de travail Azure Synapse. Le connecteur est implémenté à l’aide du langage Scala. Le connecteur prend en charge Scala et Python. Pour utiliser le Connecteur avec d’autres choix de langage de notebook, utilisez la commande magique Spark - %%spark.

Globalement, le connecteur offre les fonctionnalités suivantes :

  • Lire un pool SQL dédié Azure Synapse :
    • Lire des grands jeux de données à partir de tables de pool SQL dédié Synapse (internes et externes) et de vues.
    • Prise en charge complète du pushdown des prédicats, où les filtres sur les tableaux sont mappés au pushdown des prédicats SQL correspondant.
    • Prise en charge du nettoyage de colonne.
    • Prise en charge du pushdown.
  • Écrire dans un pool SQL dédié Azure Synapse :
    • Ingérer de grandes quantités de données de volume dans des types de tables internes et externes.
    • Prend en charge les préférences de mode d’enregistrement de DataFrame suivantes :
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • L’écriture dans un type de table externe prend en charge le format de fichier texte Parquet et délimité (par exemple, CSV).
    • Pour écrire des données dans des tables internes, le connecteur utilise désormais l’instruction COPY à la place de l’approche CETAS/CTAS.
    • Améliorations pour optimiser les performances du débit d’écriture de bout en bout.
    • Introduit un gestionnaire d’appel différé facultatif (un argument de fonction Scala) que les clients peuvent utiliser pour recevoir des métriques après écriture.
      • Citons, par exemple, le nombre d’enregistrements, la durée d’exécution d’une certaine action et la raison de l’échec.

Approche de l’orchestration

Lire

A high-level data flow diagram to describe the connector's orchestration of a read request.

Écrire

A high-level data flow diagram to describe the connector's orchestration of a write request.

Conditions préalables

Les conditions préalables, telles que la configuration des ressources Azure requises et les étapes de configuration, sont décrites dans cette section.

Ressources Azure

Examinez et configurez les ressources Azure dépendantes suivantes :

Préparation de la base de données

Connectez-vous à la base de données du Pool SQL dédié Synapse et exécutez les instructions de configuration suivantes :

  • Créez un utilisateur de base de données mappé à l’identité d’utilisateur Microsoft Entra utilisée pour se connecter à l’espace de travail Azure Synapse.

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • Créez un schéma dans lequel les tables seront définies, de sorte que le connecteur puisse écrire et lire les tables respectives.

    CREATE SCHEMA [<schema_name>];
    

Authentification

Authentification basée sur Microsoft Entra ID

L’authentification basée sur Microsoft Entra ID est une approche d’authentification intégrée. L’utilisateur doit se connecter à l’espace de travail Azure Synapse Analytics.

Authentification de base

Une approche d’authentification de base nécessite que l’utilisateur configure les options username et password. Pour en savoir plus sur les paramètres de configuration appropriés pour la lecture et l’écriture de tables dans un pool SQL dédié Azure Synapse, reportez-vous à la section Options de configuration.

Autorisation

Azure Data Lake Storage Gen2

il existe deux façons d’accorder des autorisations d’accès à un compte de stockage Azure Data Lake Storage Gen2 :

  • Rôle du contrôle d’accès en fonction du rôle : Rôle Contributeur de données Blob du stockage
    • L’attribution de Storage Blob Data Contributor Role octroie aux utilisateurs des autorisations de lecture, d’écriture et de suppression dans les conteneurs Azure Storage Blob.
    • RBAC offre une approche de contrôle grossière au niveau du conteneur.
  • Listes de contrôle d’accès (ACL)
    • L’approche ACL permet d’obtenir des contrôles précis sur des chemins d’accès et/ou des fichiers spécifiques dans un dossier donné.
    • Les vérifications des listes de contrôle d’accès ne sont pas appliquées si l’utilisateur a déjà reçu des autorisations à l’aide de l’approche RBAC.
    • Il existe deux principaux types d’autorisations ACL :
      • Autorisations d’accès (appliquées à un niveau ou objet spécifique).
      • Autorisations par défaut (appliquées automatiquement pour tous les objets enfants au moment de leur création).
    • Les types d’autorisations incluent :
      • Execute permet de traverser ou de parcourir les hiérarchies de dossiers.
      • Read permet la lecture.
      • Write permet l’écriture.
    • Il est important de configurer les listes de contrôle d’accès de sorte que le connecteur puisse écrire et lire à partir des emplacements de stockage.

Remarque

  • Si vous souhaitez exécuter des notebooks à l’aide de pipelines de l’espace de travail Synapse, vous devez également accorder les autorisations d’accès listées ci-dessus à l’identité managée par défaut de l’espace de travail Synapse. Le nom d’identité managée par défaut de l’espace de travail est identique au nom de l’espace de travail.

  • Pour utiliser l’espace de travail Synapse avec des comptes de stockage sécurisés, un point de terminaison privé managé doit être configuré à partir du notebook. Le point de terminaison privé managé doit être approuvé à partir de la section Private endpoint connections du compte de stockage ADLS Gen2, dans le volet Networking.

Azure Synapse : pool SQL dédié

Pour permettre une interaction réussie avec le Pool SQL dédié Azure Synapse, l’autorisation suivante est nécessaire, sauf si vous êtes un utilisateur également configuré en tant qu’Active Directory Admin sur le point de terminaison SQL dédié :

  • Scénario de lecture

    • Accordez à l’utilisateur le rôle db_exporter en utilisant la procédure stockée système sp_addrolemember.

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • Scénario d’écriture

    • Le connecteur utilise la commande de copie pour écrire des données de mise en lots dans l’emplacement géré de la table interne.
      • Configurez les autorisations requises décrites ici.

      • Voici un extrait de code d’accès rapide :

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

Documentation de l’API

Connecteur de pools SQL dédiés Azure Synapse pour Apache Spark - Documentation de l’API.

Options de configuration

Pour démarrer et orchestrer correctement l’opération de lecture ou d’écriture, le Connecteur attend certains paramètres de configuration. La définition d’objet com.microsoft.spark.sqlanalytics.utils.Constants fournit une liste de constantes standardisées pour chaque clé de paramètre.

Voici la liste des options de configuration en fonction du scénario d’utilisation :

  • Lire à l’aide de l’authentification basée sur Microsoft Entra ID
    • Les informations d’identification sont automatiquement mappées, et l’utilisateur n’est pas obligé de fournir des options de configuration spécifiques.
    • L’argument de nom de table en trois parties sur la méthode synapsesql est requis pour lire à partir de la table correspondante dans le Pool SQL dédié Azure Synapse.
  • Lire en utilisant l’authentification de base
    • Point de terminaison SQL dédié Azure Synapse
      • Constants.SERVER : point de terminaison de Pool SQL dédié Azure Synapse (nom de domaine complet du serveur)
      • Constants.USER : nom d’utilisateur SQL.
      • Constants.PASSWORD : mot de passe d’utilisateur SQL.
    • Point de terminaison Azure Data Lake Storage (Gen 2) - Dossiers de préproduction
      • Constants.DATA_SOURCE : le chemin de stockage défini sur le paramètre d’emplacement de la source de données est utilisé pour la préproduction des données.
  • Écrire à l’aide de l’authentification basée sur Microsoft Entra ID
    • Point de terminaison SQL dédié Azure Synapse
      • Par défaut, le Connecteur déduit le point de terminaison SQL dédié Synapse à l’aide du nom de base de données défini sur le paramètre de nom de table en trois parties de la méthode synapsesql.
      • Les utilisateurs peuvent également utiliser l’option Constants.SERVER pour spécifier le point de terminaison SQL. Vérifiez que le point de terminaison héberge la base de données correspondante avec le schéma respectif.
    • Point de terminaison Azure Data Lake Storage (Gen 2) - Dossiers de préproduction
      • Pour le type de table interne :
        • Configurez l’option Constants.TEMP_FOLDER ou Constants.DATA_SOURCE.
        • Si l’utilisateur a choisi de fournir l’option Constants.DATA_SOURCE, le dossier de préproduction est dérivé à l’aide de la valeur location de la source de données.
        • Si les deux sont fournies, la valeur de l’option Constants.TEMP_FOLDER est utilisée.
        • En l’absence d’une option de dossier de préproduction, le Connecteur en dérive un en fonction de la configuration du runtime : spark.sqlanalyticsconnector.stagingdir.prefix.
      • Pour le type de table externe :
        • Constants.DATA_SOURCE est une option de configuration requise.
        • Le connecteur utilise le chemin de stockage défini sur le paramètre d’emplacement de la source de données en association avec l’argument location de la méthode synapsesql et dérive le chemin absolu pour conserver les données de la table externe.
        • Si l’argument location de la méthode synapsesql n’est pas spécifié, le connecteur dérive la valeur d’emplacement sous la forme <base_path>/dbName/schemaName/tableName.
  • Écrire en utilisant l’authentification de base
    • Point de terminaison SQL dédié Azure Synapse
      • Constants.SERVER : point de terminaison de Pool SQL dédié Azure Synapse (nom de domaine complet du serveur).
      • Constants.USER : nom d’utilisateur SQL.
      • Constants.PASSWORD : mot de passe d’utilisateur SQL.
      • Constants.STAGING_STORAGE_ACCOUNT_KEY associé au compte de stockage qui héberge Constants.TEMP_FOLDERS (types de tables internes uniquement) ou Constants.DATA_SOURCE.
    • Point de terminaison Azure Data Lake Storage (Gen 2) - Dossiers de préproduction
      • Les informations d’identification de l’authentification de base SQL ne s’appliquent pas à l’accès aux points de terminaison de stockage.
      • Par conséquent, veillez à attribuer les autorisations d’accès au stockage pertinentes, comme décrit dans la section Azure Data Lake Storage Gen2.

Modèles de code

Cette section présente les modèles de code de référence pour décrire comment utiliser et appeler le connecteur de pool SQL dédié Azure Synapse pour Apache Spark.

Notes

Utilisation du connecteur dans Python

  • Le connecteur est pris en charge uniquement dans Python pour Spark 3. Pour Spark 2.4 (non pris en charge), nous pouvons utiliser l’API de connecteur Scala pour interagir avec le contenu d’un DataFrame dans PySpark en utilisant DataFrame.createOrReplaceTempView ou DataFrame.createOrReplaceGlobalTempView. Consultez la section Utilisation de données matérialisées entre les cellules.
  • Le descripteur de rappel n’est pas disponible dans Python.

Lire un pool SQL dédié Azure Synapse

Demande de lecture - Signature de la méthode synapsesql

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

Lire à partir d’une table à l’aide de l’authentification basée sur Microsoft Entra ID

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

Lire à partir d’une requête à l’aide de l’authentification basée sur Microsoft Entra ID

Remarque

Restrictions pendant la lecture dans une requête :

  • Le nom de table et la requête ne peuvent pas être spécifiés simultanément.
  • Seules certaines requêtes sont autorisées. Les requêtes SQL DDL et DML ne sont pas autorisées.
  • Les options de sélection et de filtre sur le dataframe ne font pas l’objet d’un pushdown sur le pool SQL dédié quand une requête est spécifiée.
  • La lecture dans une requête est disponible seulement dans Spark 3.1 et 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Lire dans une table en utilisant l’authentification de base

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

Lire dans une requête en utilisant l’authentification de base

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Écrire dans un pool SQL dédié Azure Synapse

Demande d’écriture - Signature de la méthode synapsesql

La signature de méthode pour la version de connecteur générée pour Spark 2.4.8 compte un argument de moins que celle appliquée à la version Spark 3.1.2. Voici les deux signatures de méthode :

  • Pool Spark version 2.4.8
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None):Unit
  • Pool Spark version 3.1.2
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

Écrire à l’aide de l’authentification basée sur Microsoft Entra ID

Voici un exemple de modèle de code complet qui explique comment utiliser le Connecteur pour les scénarios d’écriture :

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

Écrire en utilisant l’authentification de base

L’extrait de code suivant remplace la définition d’écriture décrite dans la section Écrire en utilisant l’authentification basée sur Microsoft Entra ID pour envoyer une requête d’écriture à l’aide de l’approche d’authentification de base SQL :

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

Dans une approche d’authentification de base, pour lire des données à partir d’un chemin de stockage source, d’autres options de configuration sont nécessaires. L’extrait de code suivant fournit un exemple de lecture d’une source de données Azure Data Lake Storage Gen2 à l’aide des informations d’identification du principal de service :

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

Modes d’enregistrement de DataFrame pris en charge

Les modes d’enregistrement suivants sont pris en charge lors de l’écriture de données sources dans une table de destination dans un pool SQL dédié Azure Synapse :

  • ErrorIfExists (mode d’enregistrement par défaut)
    • Si la table de destination existe, l’écriture est abandonnée avec une exception renvoyée à l’appelé. Sinon, une nouvelle table est créée avec les données des dossiers intermédiaires.
  • Ignorer
    • Si la table de destination existe, l’écriture ignore la demande d’écriture sans retourner d’erreur. Sinon, une nouvelle table est créée avec les données des dossiers intermédiaires.
  • Remplacer
    • Si la table de destination existe, les données existantes dans la destination sont remplacées par les données des dossiers intermédiaires. Sinon, une nouvelle table est créée avec les données des dossiers intermédiaires.
  • Ajouter
    • Si la table de destination existe, les nouvelles données sont ajoutées à celle-ci. Sinon, une nouvelle table est créée avec les données des dossiers intermédiaires.

Descripteur de rappel de demande d’écriture

Les nouvelles modifications de l’API de chemin d’accès d’écriture ont introduit une fonctionnalité expérimentale pour fournir au client un mappage clé->valeur de métriques postérieures à l’écriture. Les clés des métriques sont définies dans la nouvelle définition d’objet : Constants.FeedbackConstants. Vous pouvez récupérer les métriques sous la forme d’une chaîne JSON en passant le descripteur de rappel (une Scala Function). Voici la signature de fonction :

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

Voici quelques métriques notables (présentées en casse mixte) :

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

Voici un exemple de chaîne JSON avec des métriques postérieures à l’écriture :

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

Autres exemples de code

Utilisation de données matérialisées entre les cellules

Vous pouvez utiliser createOrReplaceTempView de Spark DataFrame pour accéder aux données extraites dans une autre cellule, en inscrivant une vue temporaire.

  • Cellule où les données sont extraites (avec, par exemple, la préférence de langage de notebook Scala)
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • À présent, remplacez la préférence de langage sur le notebook par PySpark (Python) et extrayez les données de la vue inscrite <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

Gestion des réponses

L’appel de synapsesql a deux états finaux possibles : Réussite ou État d’échec. Cette section décrit comment gérer la réponse à une demande pour chaque scénario.

Réponse à une demande de lecture

Une fois l’opération terminée, l’extrait de la réponse de lecture s’affiche dans la sortie de la cellule. Tout échec dans la cellule active annule également les exécutions de cellules suivantes. Des informations détaillées sur les erreurs sont disponibles dans les journaux des applications Spark.

Réponse à une demande d’écriture

Par défaut, une réponse d’écriture est imprimée dans la sortie de la cellule. En cas d’échec, la cellule active est marquée comme ayant échoué et les exécutions de cellules suivantes sont abandonnées. L’autre approche consiste à passer l’option de descripteur de rappel à la méthode synapsesql. Le descripteur de rappel fournit un accès programmatique à la réponse d’écriture.

Autres considérations

  • Lors de la lecture à partir des tables de pool SQL dédié Azure Synapse :
    • Envisagez d’appliquer les filtres nécessaires sur le DataFrame pour tirer parti de la fonctionnalité de nettoyage des colonnes du Connecteur.
    • Un scénario de lecture ne prend pas en charge la clause TOP(n-rows) lors de la délimitation des instructions de requête SELECT. Le choix de limiter les données consiste à utiliser la clause limit(.) de DataFrame.
  • Lors de l’écriture dans des tables de pool SQL dédié Azure Synapse :
    • Pour les types de tables internes :
      • Les tables sont créées avec une distribution de données ROUND_ROBIN.
      • Les types de colonnes sont déduits du DataFrame qui lit les données à partir de la source. Les colonnes de chaîne sont mappées à NVARCHAR(4000).
    • Pour les types de tables externes :
      • Le parallélisme initial du DataFrame pilote l’organisation des données pour la table externe.
      • Les types de colonnes sont déduits du DataFrame qui lit les données à partir de la source.
    • Vous pouvez obtenir une meilleure distribution des données entre exécuteurs en réglant spark.sql.files.maxPartitionBytes et le paramètre repartition de DataFrame.
    • Lors de l’écriture de jeux de données volumineux, il est important de tenir compte de l’impact du paramètre Niveau de performance DWU qui limite la taille des transactions.
  • Supervisez les tendances d’utilisation d’Azure Data Lake Storage Gen2 pour repérer les comportements de limitation qui peuvent avoir un impact sur les performances de lecture et d’écriture.

Références