Sdílet prostřednictvím


Konektor pro vyhrazený SQL fond služby Azure Synapse pro Apache Spark

Úvod

Konektor vyhrazeného fondu SQL pro Apache Spark v Azure Synapse Analytics umožňuje efektivní přenos velkých datových sad mezi prostředím Apache Spark a vyhrazeným fondem SQL. Konektor se dodává jako výchozí knihovna s Azure Synapse Workspace. Konektor se implementuje pomocí Scala jazyka. Konektor podporuje Scala a Python. Pokud chcete konektor použít s jinými volbami jazyka poznámkového bloku, použijte Spark magic příkaz - %%spark.

Konektor na vysoké úrovni poskytuje následující funkce:

  • Čtení z vyhrazeného SQL fondu Azure Synapse:
    • Čtení velkých datových sad z tabulek vyhrazeného fondu SQL Synapse (interních a externích) a zobrazení
    • Komplexní podpora predikátové optimalizace, při níž se filtry datového rámce mapují na odpovídající predikátovou optimalizaci SQL.
    • Podpora pro vyřezávání sloupců
    • Podpora pro query push down.
  • Zápis do dedikovaného SQL fondu Azure Synapse:
    • Ingestování velkých objemů dat na interní a externí typy tabulek
    • Podporuje následující předvolby režimu ukládání datového rámce:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • Typ zápisu do externí tabulky podporuje formát souboru Parquet a textový soubor s oddělovači (příklad – CSV).
    • K zápisu dat do interních tabulek teď konektor místo přístupu CETAS/CTAS používá příkaz COPY.
    • Vylepšení pro optimalizaci výkonu komplexní propustnosti zápisu
    • Představuje volitelný popisovač zpětného volání (argument funkce Scala), který můžou klienti použít k příjmu metrik po zápisu.
      • Mezi příklady patří : počet záznamů, doba trvání dokončení určité akce a důvod selhání.

Přístup k orchestraci

Čtěte

Diagram toku dat vysoké úrovně, který popisuje orchestraci žádosti o čtení konektoru.

Napiš

Diagram toku dat vysoké úrovně, který popisuje orchestraci žádosti o zápis konektoru.

Požadavky

Požadavky, jako je nastavení požadovaných prostředků Azure a postup jejich konfigurace, jsou popsány v této části.

Prostředky Azure

Zkontrolujte a nastavte následující závislé prostředky Azure:

Příprava databáze

Připojte se k databázi vyhrazeného fondu Synapse SQL a spusťte následující konfigurační příkazy:

  • Vytvořte uživatele databáze, který je namapovaný na identitu uživatele Microsoft Entra sloužící k přihlášení k pracovnímu prostoru Azure Synapse.

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • Vytvořte schéma, ve kterém budou definovány tabulky, aby konektor mohl úspěšně zapisovat a číst z příslušných tabulek.

    CREATE SCHEMA [<schema_name>];
    

Ověřování

Ověřování pomocí ID Microsoft Entra

Ověřování na základě ID Microsoftu je komplexní metoda ověřování. Uživatel se musí úspěšně přihlásit k pracovnímu prostoru Azure Synapse Analytics.

Základní ověřování

Základní přístup ověřování vyžaduje, aby uživatel nakonfiguroval username a password. Informace o relevantních parametrech konfigurace pro čtení a zápis do tabulek ve vyhrazeném fondu SQL Azure Synapse najdete v části Možnosti konfigurace.

Autorizace

Azure Data Lake Storage Gen2

Přístupová oprávnění ke službě Azure Data Lake Storage Gen2 – Účet úložiště můžete udělit dvěma způsoby:

  • Řízení přístupu na základě rolí – Přispěvatel dat do objektů blob úložiště
    • Přiřazením Storage Blob Data Contributor Role se uživatel oprávní ke čtení, zápisu a odstraňování z kontejnerů objektů blob v rámci služby Azure Storage.
    • RBAC nabízí hrubou kontrolu na úrovni kontejneru.
  • Seznamy řízení přístupu (ACL)
    • Přístup ACL umožňuje jemně odstupňovanou kontrolu nad konkrétními cestami nebo soubory v dané složce.
    • Kontroly ACL se nevynucují, pokud má uživatel již oprávnění udělená pomocí přístupu RBAC.
    • Existují dva hlavní typy oprávnění ACL:
      • Přístupová oprávnění (použitá na konkrétní úrovni nebo objektu)
      • Výchozí oprávnění (automaticky uplatňována pro všechny podřízené objekty v okamžiku jejich vytvoření).
    • Typ oprávnění zahrnuje:
      • Execute umožňuje procházet hierarchie složek nebo procházet je.
      • Read umožňuje číst.
      • Write umožňuje psát.
    • Je důležité nakonfigurovat ACL tak, aby konektor mohl úspěšně zapisovat a číst z umístění úložiště.

Poznámka:

  • Pokud chcete spouštět poznámkové bloky pomocí kanálů pracovního prostoru Synapse, musíte také udělit výše uvedená přístupová oprávnění výchozí spravované identitě pracovního prostoru Synapse. Výchozí název spravované identity pracovního prostoru je stejný jako název pracovního prostoru.

  • Pokud chcete používat pracovní prostor Synapse se zabezpečenými účty úložiště, musí být spravovaný privátní koncový bod nakonfigurovaný z notebooku. Spravovaný soukromý koncový bod musí být schválen v podokně Private endpoint connections v části účtu úložiště ADLS Gen2 Networking.

Vyhrazený fond SQL služby Azure Synapse

Pokud chcete povolit úspěšnou interakci s dedikovaným fondem SQL Azure Synapse, je nutné provést následující autorizaci, pokud nejste uživatelem, který je také nakonfigurován na dedikovaném koncovém bodě SQL jako Active Directory Admin.

  • Přečíst scénář

    • Udělte uživateli db_exporter pomocí systémové uložené procedury sp_addrolemember.

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • Napište scénář

    • Konektor používá příkaz COPY k zápisu dat z prostředí pro dočasné uložení do spravovaného prostoru interní tabulky.
      • Nakonfigurujte požadovaná oprávnění popsaná tady.

      • Následuje fragment kódu rychlého přístupu stejného:

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

Dokumentace k rozhraní API

Konektor dedikovaného SQL fondu pro Azure Synapse pro Apache Spark – Dokumentace API

Možnosti konfigurace

Pokud chcete úspěšně spustit a orchestrovat operaci čtení nebo zápisu, konektor očekává určité konfigurační parametry. Definice objektu – com.microsoft.spark.sqlanalytics.utils.Constants poskytuje seznam standardizovaných konstant pro každý klíč parametru.

Následuje seznam možností konfigurace na základě scénáře použití:

  • Čtení pomocí ověřování prostřednictvím Microsoft Entra ID
    • Přihlašovací údaje se automaticky namapují a uživatel nemusí poskytovat konkrétní možnosti konfigurace.
    • Argument názvu třídílné tabulky v synapsesql metodě se vyžaduje ke čtení z příslušné tabulky ve vyhrazeném fondu SQL Azure Synapse.
  • Čtení pomocí základního ověřování
    • Vyhrazený koncový bod SQL služby Azure Synapse
      • Constants.SERVER – Koncový bod vyhrazeného fondu SQL Synapse (FQDN serveru)
      • Constants.USER – Uživatelské jméno SQL.
      • Constants.PASSWORD – Heslo uživatele SQL.
    • Koncový bod služby Azure Data Lake Storage (Gen 2) – Přechodné složky
      • Constants.DATA_SOURCE – Cesta k úložišti nastavená na parametr umístění zdroje dat se používá pro zpracování dat.
  • Zápis pomocí ověřování založeného na Microsoft Entra ID
    • Vyhrazený koncový bod SQL služby Azure Synapse
      • Ve výchozím nastavení konektor odvodí koncový bod Synapse Dedicated SQL pomocí názvu databáze nastaveného na tříčástém názvu tabulky v parametru metody synapsesql.
      • Případně můžou uživatelé použít Constants.SERVER možnost zadat koncový bod SQL. Ujistěte se, že koncový bod hostuje odpovídající databázi s příslušným schématem.
    • Koncový bod služby Azure Data Lake Storage (Gen 2) – Přechodné složky
      • Pro interní typ tabulky:
        • Nakonfigurujte jednu Constants.TEMP_FOLDER nebo Constants.DATA_SOURCE možnost.
        • Pokud uživatel zvolí možnost Constants.DATA_SOURCE, pracovní složka bude odvozena z hodnoty location z DataSource.
        • Pokud jsou k dispozici obě možnosti, bude použita hodnota možnosti Constants.TEMP_FOLDER.
        • Pokud není k dispozici možnost pracovní složky, konektor odvodí jednu na základě konfigurace modulu runtime - spark.sqlanalyticsconnector.stagingdir.prefix.
      • Pro externí typ tabulky:
        • Constants.DATA_SOURCE je požadovaná možnost konfigurace.
        • Konektor používá cestu k úložišti nastavenou pro parametr umístění zdroje dat v kombinaci s argumentem locationsynapsesql metody a odvozuje absolutní cestu k zachování dat externí tabulky.
        • location Pokud není zadaný argument metodysynapsesql, konektor odvodí hodnotu umístění jako <base_path>/dbName/schemaName/tableName.
  • Psaní pomocí základního ověřování
    • Vyhrazený koncový bod SQL služby Azure Synapse
      • Constants.SERVER - – Koncový bod vyhrazeného fondu SQL Synapse (plně kvalifikovaný název domény serveru).
      • Constants.USER – Uživatelské jméno SQL.
      • Constants.PASSWORD – Heslo uživatele SQL.
      • Constants.STAGING_STORAGE_ACCOUNT_KEY přidružený k účtu úložiště, který hostuje Constants.TEMP_FOLDERS (pouze interní typy tabulek) nebo Constants.DATA_SOURCE.
    • Koncový bod služby Azure Data Lake Storage (Gen 2) – Přechodné složky
      • Přihlašovací údaje základního ověřování SQL se nevztahují na přístup ke koncovým bodům úložiště.
      • Proto se ujistěte, že chcete přiřadit relevantní přístupová oprávnění k úložišti, jak je popsáno v části Azure Data Lake Storage Gen2.

Šablony kódu

Tato část obsahuje referenční šablony kódu, které popisují použití a vyvolání konektoru vyhrazeného fondu SQL Azure Synapse pro Apache Spark.

Poznámka:

Použití konektoru v Pythonu

Čtení z dedikovaného SQL fondu služby Azure Synapse Analytics

Read Request – synapsesql podpis metody

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

Čtení z tabulky pomocí ověřování založeného na Microsoft Entra ID

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

Čtení z dotazu pomocí ověřování založeného na Microsoft Entra ID

Poznámka:

Omezení při čtení z dotazu:

  • Název tabulky a dotaz nelze zadat současně.
  • Jsou povoleny pouze výběrové dotazy. DDL a DML příkazy SQL nejsou povoleny.
  • Možnosti výběru a filtru datového rámce se při zadání dotazu neodsouvají do vyhrazeného fondu SQL.
  • Čtení z dotazu je dostupné jenom ve Sparku 3.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Čteme z tabulky pomocí základního ověřování

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

Čtení z dotazu pomocí základního ověřování

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Zápis do dedikovaného SQL fondu Azure Synapse

Žádost o zápis – synapsesql signatura metody

synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

Psaní s autentizací založenou na identitě Microsoft Entra ID

Následuje komplexní šablona kódu, která popisuje použití konektoru pro scénáře zápisu:

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

Psát pomocí základního ověřování

Následující fragment kódu nahrazuje definici zápisu popsanou v části Write using Microsoft Entra ID based authentication za účelem odeslání požadavku na zápis pomocí přístupu základního ověřování SQL.

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

Při použití základních metod ověřování jsou potřebné další možnosti konfigurace k přečtení dat ze zdrojové cesty úložiště. Následující úryvek kódu poskytuje příklad pro čtení ze zdroje dat Azure Data Lake Storage Gen2 pomocí přihlašovacích údajů principála služby.

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

Podporované režimy ukládání datového rámce

Při zápisu zdrojových dat do cílové tabulky ve vyhrazeném fondu SQL Azure Synapse se podporují následující režimy ukládání:

  • ErrorIfExists (výchozí režim ukládání)
    • Pokud cílová tabulka existuje, zápis se přeruší s výjimkou vrácenou volaným. V opačném případě se vytvoří nová tabulka s daty z přípravných složek.
  • Ignorovat
    • Pokud cílová tabulka existuje, bude zápis ignorovat požadavek na zápis bez vrácení chyby. V opačném případě se vytvoří nová tabulka s daty z přípravných složek.
  • Přepsat
    • Pokud cílová tabulka existuje, stávající data v cílové tabulce jsou nahrazena daty z pracovních složek. V opačném případě se vytvoří nová tabulka s daty z přípravných složek.
  • Připojit
    • Pokud cílová tabulka existuje, připojí se k ní nová data. V opačném případě se vytvoří nová tabulka s daty z přípravných složek.

Popisovač zpětného volání požadavku na zápis

Nové rozhraní API cest zápisu zavedlo experimentální funkci, která klientovi poskytuje mapu klíč-hodnota> metrik po zápisu. Klíče pro metriky jsou definovány v nové definici objektu - Constants.FeedbackConstants. Metriky je možné načíst jako řetězec JSON pomocí popisovače zpětného volání (a Scala Function). Následuje podpis funkce:

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

Tady jsou některé významné metriky (uvedené v camel case):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

Následuje ukázkový řetězec JSON s metrikami po zápisu:

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

Další ukázky kódu

Použití materializovaných dat napříč buňkami

Datový rámec createOrReplaceTempView Sparku lze použít k přístupu k datům načteným v jiné buňce registrací dočasného zobrazení.

  • Buňka, ze které se načítají data (například s předvolbou jazyka poznámkového bloku jako Scala)
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • Teď změňte předvolbu jazyka v poznámkovém bloku na PySpark (Python) a načtěte data z registrovaného zobrazení. <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

Zpracování odpovědí

Vyvolání synapsesql má dva možné koncové stavy – úspěch nebo stav selhání. Tato část popisuje, jak zpracovat odpověď na žádost pro jednotlivé scénáře.

Odpověď na žádost o čtení

Po dokončení se čtecí odpověď zobrazí ve výstupu buňky. Selhání v aktuální buňce zruší také provádění následujících buněk. Podrobné informace o chybách jsou k dispozici v protokolech aplikací Sparku.

Odpověď na žádost o zápis

Ve výchozím nastavení se do výstupu buňky vytiskne odezva na zápis. Při selhání je aktuální buňka označena jako neúspěšná a následné provádění buněk bude přerušeno. Druhým přístupem je předat metodě možnost handlu zpětného volání. Popisovač zpětného hovoru poskytne programátorský přístup k odpovědi na zápis.

Ostatní úvahy

  • Při čtení z tabulek vyhrazeného fondu SQL Azure Synapse:
    • Zvažte použití nezbytných filtrů v datovém rámci, abyste mohli využít funkci vyřazení sloupců konektoru.
    • Scénář čtení nepodporuje TOP(n-rows) klauzuli při vytváření SELECT rámců příkazů dotazu. Volba pro omezení dat spočívá v použití klauzule Limit(.) datového rámce.
  • Při zápisu do tabulek vyhrazeného fondu SQL Azure Synapse:
    • Pro interní typy tabulek:
      • Tabulky se vytvářejí rozdělením dat pomocí ROUND_ROBIN.
      • Typy sloupců se odvozují z datového rámce, který bude číst data ze zdroje. Řetězcové sloupce jsou mapovány na NVARCHAR(4000).
    • Pro externí typy tabulek:
      • Počáteční paralelismus datového rámce řídí organizaci dat pro externí tabulku.
      • Typy sloupců se odvozují z datového rámce, který bude číst data ze zdroje.
    • Lepší distribuce dat mezi exekutory lze dosáhnout vyladěním spark.sql.files.maxPartitionBytes a parametru repartition datového rámce.
    • Při psaní velkých datových sad je důležité při rozhodování o dopadu nastavení úrovně výkonu DWU, které omezuje velikost transakce.
  • Monitorujte trendy využití Azure Data Lake Storage Gen2 a sledujte chování omezování, které může mít vliv na výkon čtení a zápisu.