Conector do Conjunto de SQL Dedicado do Azure Synapse para Apache Spark

Introdução

O Azure Synapse Dedicated SQL Pool Connector for Apache Spark no Azure Synapse Analytics permite a transferência eficiente de grandes conjuntos de dados entre o tempo de execução do Apache Spark e o pool SQL dedicado. O conector é enviado como uma biblioteca predefinida com a Área de Trabalho do Azure Synapse. O conector é implementado usando Scala linguagem. O conector suporta Scala e Python. Para usar o Conector com outras opções de idioma do bloco de anotações, use o comando mágico do Spark - %%spark.

Em um alto nível, o conector fornece os seguintes recursos:

  • Leia a partir do Azure Synapse Dedicated SQL Pool:
    • Leia grandes conjuntos de dados de Synapse Dedicated SQL Pool Tables (Internal and External) e exibições.
    • Suporte abrangente a push down de predicados, onde os filtros no DataFrame são mapeados para o push down do predicado SQL correspondente.
    • Suporte para poda de colunas.
    • Suporte para push down de consulta.
  • Escreva no Azure Synapse Dedicated SQL Pool:
    • Ingerir dados de grande volume para tipos de tabelas internas e externas.
    • Suporta as seguintes preferências do modo de salvamento DataFrame:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • O tipo Write to External Table suporta o formato de arquivo Parquet e Delimited Text (exemplo - CSV).
    • Para gravar dados em tabelas internas, o conector agora usa a instrução COPY em vez da abordagem CETAS/CTAS.
    • Aprimoramentos para otimizar o desempenho completo da taxa de transferência de gravação.
    • Introduz um identificador de retorno de chamada opcional (um argumento de função Scala) que os clientes podem usar para receber métricas pós-gravação.
      • Alguns exemplos incluem - número de registros, duração para concluir determinada ação e motivo da falha.

Abordagem de orquestração

Leitura

A high-level data flow diagram to describe the connector's orchestration of a read request.

Escrita

A high-level data flow diagram to describe the connector's orchestration of a write request.

Pré-requisitos

Os pré-requisitos, como a configuração dos recursos necessários do Azure e as etapas para configurá-los, são discutidos nesta seção.

Recursos do Azure

Analise e configure os seguintes Recursos do Azure dependentes:

Preparar a base de dados

Conecte-se ao banco de dados Synapse Dedicated SQL Pool e execute as seguintes instruções de instalação:

  • Crie um usuário de banco de dados mapeado para a Identidade de usuário do Microsoft Entra usada para entrar no Espaço de Trabalho Sinapse do Azure.

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • Crie um esquema no qual as tabelas serão definidas, de modo que o Connector possa gravar e ler com êxito as respetivas tabelas.

    CREATE SCHEMA [<schema_name>];
    

Autenticação

Autenticação baseada no Microsoft Entra ID

A autenticação baseada no Microsoft Entra ID é uma abordagem de autenticação integrada. O usuário deve entrar com êxito no Espaço de Trabalho do Azure Synapse Analytics.

Autenticação básica

Uma abordagem de autenticação básica requer que o usuário configure username e password opções. Consulte a seção - Opções de configuração para saber mais sobre parâmetros de configuração relevantes para leitura e gravação em tabelas no Azure Synapse Dedicated SQL Pool.

Autorização

Azure Data Lake Storage Gen2 (Armazenamento do Azure Data Lake Gen2)

Há duas maneiras de conceder permissões de acesso ao Azure Data Lake Storage Gen2 - Conta de Armazenamento:

  • Função de Controle de Acesso com base em função - função de Colaborador de Dados de Blob de Armazenamento
    • A atribuição do concede ao Storage Blob Data Contributor Role Usuário permissões para ler, gravar e excluir dos Contêineres de Blob de Armazenamento do Azure.
    • O RBAC oferece uma abordagem de controle grosseiro no nível do contêiner.
  • Listas de controle de acesso (ACL)
    • A abordagem ACL permite controles refinados sobre caminhos e/ou arquivos específicos em uma determinada pasta.
    • As verificações de ACL não serão aplicadas se o Usuário já tiver recebido permissões usando a abordagem RBAC.
    • Existem dois tipos amplos de permissões de ACL:
      • Permissões de acesso (aplicadas em um nível ou objeto específico).
      • Permissões padrão (aplicadas automaticamente para todos os objetos filho no momento de sua criação).
    • Os tipos de permissões incluem:
      • Execute Permite a capacidade de percorrer ou navegar nas hierarquias de pastas.
      • Read permite a capacidade de leitura.
      • Write permite a capacidade de escrever.
    • É importante configurar ACLs para que o conector possa gravar e ler com êxito nos locais de armazenamento.

Nota

  • Se quiser executar blocos de anotações usando pipelines do Synapse Workspace, você também deve conceder permissões de acesso listadas acima para a identidade gerenciada padrão do Synapse Workspace. O nome de identidade gerenciado padrão do espaço de trabalho é igual ao nome do espaço de trabalho.

  • Para usar o espaço de trabalho Synapse com contas de armazenamento seguras, um ponto de extremidade privado gerenciado deve ser configurado a partir do bloco de anotações. O ponto de extremidade privado gerenciado deve ser aprovado na seção da conta Private endpoint connections de armazenamento ADLS Gen2 no Networking painel.

Azure Synapse Dedicated SQL Pool

Para habilitar a interação bem-sucedida com o SQL Pool Dedicado do Azure Synapse, a seguinte autorização é necessária, a menos que você seja um usuário também configurado como um Active Directory Admin no Ponto de Extremidade SQL Dedicado:

  • Ler cenário

    • Conceda ao usuário db_exporter usando o procedimento sp_addrolememberarmazenado do sistema .

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • Cenário de escrita

    • O Connector usa o comando COPY para gravar dados do preparo no local gerenciado da tabela interna.
      • Configure as permissões necessárias descritas aqui.

      • A seguir está um trecho de acesso rápido do mesmo:

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

Documentação da API

Azure Synapse Dedicated SQL Pool Connector for Apache Spark - Documentação da API.

Opções de configuração

Para inicializar e orquestrar com êxito a operação de leitura ou gravação, o Conector espera determinados parâmetros de configuração. A definição de objeto - com.microsoft.spark.sqlanalytics.utils.Constants fornece uma lista de constantes padronizadas para cada chave de parâmetro.

A seguir está a lista de opções de configuração com base no cenário de uso:

  • Ler usando a autenticação baseada no Microsoft Entra ID
    • As credenciais são mapeadas automaticamente e o usuário não precisa fornecer opções de configuração específicas.
    • O argumento de nome de tabela de três partes no método é necessário para ler a respetiva tabela no synapsesql Pool SQL Dedicado do Azure Synapse.
  • Ler usando autenticação básica
    • Ponto de extremidade SQL dedicado do Azure Synapse
      • Constants.SERVER - Synapse Dedicated SQL Pool End Point (FQDN do servidor)
      • Constants.USER - Nome de usuário SQL.
      • Constants.PASSWORD - Senha de usuário SQL.
    • Ponto Final do Armazenamento do Azure Data Lake (Gen 2) - Pastas de Preparo
      • Constants.DATA_SOURCE - O caminho de armazenamento definido no parâmetro de localização da fonte de dados é usado para o preparo de dados.
  • Escrever usando a autenticação baseada no Microsoft Entra ID
    • Ponto de extremidade SQL dedicado do Azure Synapse
      • Por padrão, o Connector infere o ponto de extremidade Synapse Dedicated SQL usando o nome do banco de dados definido no synapsesql parâmetro de nome de tabela de três partes do método.
      • Como alternativa, os usuários podem usar a Constants.SERVER opção para especificar o ponto de extremidade sql. Verifique se o ponto final hospeda o banco de dados correspondente com o respetivo esquema.
    • Ponto Final do Armazenamento do Azure Data Lake (Gen 2) - Pastas de Preparo
      • Para o tipo de tabela interna:
        • Configure uma ou Constants.TEMP_FOLDERConstants.DATA_SOURCE opção.
        • Se o usuário optar por fornecer Constants.DATA_SOURCE a opção, a pasta de preparo será derivada usando o location valor do DataSource.
        • Se ambos forem fornecidos, o valor da Constants.TEMP_FOLDER opção será usado.
        • Na ausência de uma opção de pasta de preparo, o Conector derivará uma com base na configuração de tempo de execução - spark.sqlanalyticsconnector.stagingdir.prefix.
      • Para o tipo de tabela externa:
        • Constants.DATA_SOURCE é uma opção de configuração necessária.
        • O conector usa o caminho de armazenamento definido no parâmetro location da fonte de dados em combinação com o argumento para o método e deriva o locationsynapsesql caminho absoluto para persistir os dados da tabela externa.
        • Se o argumento para synapsesql o método não for especificado, o conector derivará o location valor do local como <base_path>/dbName/schemaName/tableName.
  • Escrever usando autenticação básica
    • Ponto de extremidade SQL dedicado do Azure Synapse
      • Constants.SERVER - - Synapse Dedicated SQL Pool End Point (FQDN do servidor).
      • Constants.USER - Nome de usuário SQL.
      • Constants.PASSWORD - Senha de usuário SQL.
      • Constants.STAGING_STORAGE_ACCOUNT_KEY associada à Conta de Armazenamento que hospeda Constants.TEMP_FOLDERS (somente tipos de tabela interna) ou Constants.DATA_SOURCE.
    • Ponto Final do Armazenamento do Azure Data Lake (Gen 2) - Pastas de Preparo
      • As credenciais de autenticação básica do SQL não se aplicam aos pontos finais de armazenamento de acesso.
      • Portanto, certifique-se de atribuir permissões de acesso ao armazenamento relevantes, conforme descrito na seção Azure Data Lake Storage Gen2.

Modelos de código

Esta seção apresenta modelos de código de referência para descrever como usar e invocar o Azure Synapse Dedicated SQL Pool Connector for Apache Spark.

Nota

Usando o conector em Python-

  • O conector é suportado em Python apenas para Spark 3. Para o Spark 2.4 (sem suporte), podemos usar a API do conector Scala para interagir com o conteúdo de um DataFrame no PySpark usando DataFrame.createOrReplaceTempView ou DataFrame.createOrReplaceGlobalTempView. Consulte a Secção - Utilização de dados materializados entre células.
  • O identificador de retorno de chamada não está disponível em Python.

Leia a partir do Azure Synapse Dedicated SQL Pool

Solicitação de leitura - synapsesql assinatura do método

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

Ler a partir de uma tabela usando a autenticação baseada no Microsoft Entra ID

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

Ler a partir de uma consulta usando a autenticação baseada no Microsoft Entra ID

Nota

Restrições durante a leitura da consulta:

  • O nome da tabela e a consulta não podem ser especificados ao mesmo tempo.
  • Só são permitidas consultas selecionadas. DDL e DML SQLs não são permitidos.
  • As opções de seleção e filtro no dataframe não são enviadas para o pool dedicado SQL quando uma consulta é especificada.
  • A leitura de uma consulta só está disponível no Spark 3.1 e 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Ler a partir de uma tabela usando autenticação básica

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

Ler a partir de uma consulta usando autenticação básica

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Escrever no Azure Synapse Dedicated SQL Pool

Write Request - synapsesql assinatura do método

A assinatura do método para a versão do conector criada para o Spark 2.4.8 tem um argumento a menos do que o aplicado à versão do Spark 3.1.2. A seguir estão as duas assinaturas de método:

  • Spark Pool Versão 2.4.8
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None):Unit
  • Spark Pool Versão 3.1.2
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

Escrever usando a autenticação baseada no Microsoft Entra ID

A seguir está um modelo de código abrangente que descreve como usar o conector para cenários de gravação:

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

Escrever usando autenticação básica

O trecho de código a seguir substitui a definição de gravação descrita na seção Gravar usando autenticação baseada em ID do Microsoft Entra, para enviar solicitação de gravação usando a abordagem de autenticação básica do SQL:

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

Em uma abordagem de autenticação básica, para ler dados de um caminho de armazenamento de origem, outras opções de configuração são necessárias. O trecho de código a seguir fornece um exemplo para ler de uma fonte de dados do Azure Data Lake Storage Gen2 usando credenciais da Entidade de Serviço:

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

Modos de salvamento DataFrame suportados

Os seguintes modos de salvamento são suportados ao gravar dados de origem em uma tabela de destino no Pool SQL Dedicado do Azure Synapse:

  • ErrorIfExists (modo de gravação padrão)
    • Se a tabela de destino existir, a gravação será anulada com uma exceção retornada ao destinatário. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.
  • Ignorar
    • Se a tabela de destino existir, a gravação ignorará a solicitação de gravação sem retornar um erro. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.
  • Substituir
    • Se a tabela de destino existir, os dados existentes no destino serão substituídos por dados das pastas de preparo. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.
  • Anexar
    • Se a tabela de destino existir, os novos dados serão anexados a ela. Caso contrário, uma nova tabela é criada com dados das pastas de preparo.

Identificador de retorno de chamada de solicitação de gravação

As novas alterações na API de caminho de gravação introduziram um recurso experimental para fornecer ao cliente um mapa chave-valor de métricas pós-gravação>. As chaves para as métricas são definidas na nova definição de objeto - Constants.FeedbackConstants. As métricas podem ser recuperadas como uma cadeia de caracteres JSON passando o identificador de retorno de chamada (a Scala Function). Segue-se a assinatura da função:

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

A seguir estão algumas métricas notáveis (apresentadas no caso do camelo):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

A seguir está um exemplo de cadeia de caracteres JSON com métricas pós-gravação:

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

Mais exemplos de código

Usando dados materializados entre células

O Spark DataFrame pode createOrReplaceTempView ser usado para acessar dados obtidos em outra célula, registrando uma exibição temporária.

  • Célula onde os dados são obtidos (digamos com a preferência de idioma do Bloco de Anotações como Scala)
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • Agora, altere a preferência de idioma no Bloco de Anotações PySpark (Python) e busque dados do modo de exibição registrado <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

Tratamento das respostas

Invocar synapsesql tem dois estados finais possíveis - Sucesso ou um Estado Falhado. Esta seção descreve como lidar com a resposta de solicitação para cada cenário.

Ler resposta de pedido

Após a conclusão, o trecho de resposta de leitura é exibido na saída da célula. Uma falha na célula atual também cancelará as execuções subsequentes da célula. Informações detalhadas sobre erros estão disponíveis nos logs do aplicativo Spark.

Escrever resposta de solicitação

Por padrão, uma resposta de gravação é impressa na saída da célula. Em caso de falha, a célula atual é marcada como falha e as execuções de célula subsequentes serão anuladas. A outra abordagem é passar a opção de identificador de retorno de chamada para o synapsesql método. O identificador de retorno de chamada fornecerá acesso programático à resposta de gravação.

Outras considerações

  • Ao ler as tabelas do Pool SQL Dedicado do Azure Synapse:
    • Considere a aplicação dos filtros necessários no DataFrame para aproveitar o recurso de remoção de colunas do Connector.
    • O cenário de leitura não suporta a TOP(n-rows) cláusula, ao enquadrar as SELECT instruções de consulta. A opção para limitar dados é usar a cláusula limit(.) do DataFrame.
  • Ao escrever nas tabelas do Pool SQL Dedicado do Azure Synapse:
    • Para tipos de tabelas internas:
      • As tabelas são criadas com ROUND_ROBIN distribuição de dados.
      • Os tipos de coluna são inferidos a partir do DataFrame que leria dados da origem. As colunas de cadeia de caracteres são mapeadas para NVARCHAR(4000).
    • Para tipos de tabelas externas:
      • O paralelismo inicial do DataFrame orienta a organização de dados para a tabela externa.
      • Os tipos de coluna são inferidos a partir do DataFrame que leria dados da origem.
    • Uma melhor distribuição de dados entre executores pode ser alcançada ajustando o parâmetro e o spark.sql.files.maxPartitionBytes DataFrame repartition .
    • Ao escrever grandes conjuntos de dados, é importante levar em consideração o impacto da configuração de Nível de Desempenho DWU que limita o tamanho da transação.
  • Monitore as tendências de utilização do Azure Data Lake Storage Gen2 para identificar comportamentos de limitação que podem afetar o desempenho de leitura e gravação.

Referências