Azure Synapse toegewezen SQL-poolconnector voor Apache Spark
Introductie
De toegewezen SQL-pool van Azure Synapse Verbinding maken or voor Apache Spark in Azure Synapse Analytics maakt een efficiënte overdracht van grote gegevenssets mogelijk tussen de Apache Spark-runtime en de toegewezen SQL-pool. De connector wordt geleverd als een standaardbibliotheek met Azure Synapse-werkruimte. De connector wordt geïmplementeerd met behulp van Scala
de taal. De connector ondersteunt Scala en Python. Als u de Verbinding maken or wilt gebruiken met andere taalopties voor notebooks, gebruikt u de Magic-opdracht van Spark - %%spark
.
Op hoog niveau biedt de connector de volgende mogelijkheden:
- Lezen uit de toegewezen SQL-pool van Azure Synapse:
- Lees grote gegevenssets uit Synapse Dedicated SQL-pooltabellen (intern en extern) en weergaven.
- Uitgebreide push-downondersteuning voor predicaat, waarbij filters op DataFrame worden toegewezen aan de bijbehorende SQL-predicaatpush omlaag.
- Ondersteuning voor het snoeien van kolommen.
- Ondersteuning voor het pushen van query's.
- Schrijven naar een toegewezen SQL-pool van Azure Synapse:
- Gegevens van grote volumes opnemen in interne en externe tabeltypen.
- Ondersteunt de volgende voorkeuren voor de opslagmodus van DataFrame:
Append
ErrorIfExists
Ignore
Overwrite
- Schrijven naar extern tabeltype ondersteunt parquet- en tekstbestandsindeling met scheidingstekens (bijvoorbeeld CSV).
- Als u gegevens naar interne tabellen wilt schrijven, gebruikt de connector nu de COPY-instructie in plaats van de CETAS-/CTAS-benadering.
- Verbeteringen voor het optimaliseren van end-to-end schrijfdoorvoerprestaties.
- Introduceert een optionele call-back-handle (een scala-functieargument) die clients kunnen gebruiken om metrische gegevens na schrijven te ontvangen.
- Enkele voorbeelden zijn: het aantal records, de duur voor het voltooien van een bepaalde actie en de reden van de fout.
Indelingsbenadering
Lezen
Write
Vereisten
Vereisten, zoals het instellen van vereiste Azure-resources en stappen voor het configureren ervan, worden in deze sectie besproken.
Azure-resources
Controleer en stel de volgende afhankelijke Azure-resources in:
- Azure Data Lake Storage : wordt gebruikt als het primaire opslagaccount voor de Azure Synapse-werkruimte.
- Azure Synapse-werkruimte : notebooks maken, op DataFrame gebaseerde werkstromen voor inkomend verkeer maken en implementeren.
- Toegewezen SQL-pool (voorheen SQL DW): biedt enterprise-Databeheersysteem-functies.
- Serverloze Spark-pool van Azure Synapse - Spark-runtime waarin de taken worden uitgevoerd als Spark-toepassingen.
De database voorbereiden
Verbinding maken naar de Synapse Dedicated SQL-pooldatabase en voer de volgende installatie-instructies uit:
Maak een databasegebruiker die is toegewezen aan de Microsoft Entra-gebruikersidentiteit die wordt gebruikt om u aan te melden bij de Azure Synapse-werkruimte.
CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;
Maak een schema waarin tabellen worden gedefinieerd, zodat de Verbinding maken or kan schrijven naar en lezen vanuit respectieve tabellen.
CREATE SCHEMA [<schema_name>];
Verificatie
Verificatie op basis van Microsoft Entra-id
Verificatie op basis van Microsoft Entra ID is een geïntegreerde verificatiemethode. De gebruiker moet zich aanmelden bij de Azure Synapse Analytics-werkruimte.
Basisverificatie
Voor een basisverificatiebenadering moet de gebruiker de configuratie en password
opties instellenusername
. Raadpleeg de sectie - Configuratieopties voor meer informatie over relevante configuratieparameters voor het lezen van en schrijven naar tabellen in Azure Synapse Dedicated SQL-pool.
Autorisatie
Azure Data Lake Storage Gen2
Er zijn twee manieren om toegangsmachtigingen te verlenen aan Azure Data Lake Storage Gen2 - Opslagaccount:
- Rol op basis van toegangsbeheer voor opslagblobgegevens
- Als u de
Storage Blob Data Contributor Role
gebruiker toewijst, worden de gebruikersmachtigingen verleend voor lezen, schrijven en verwijderen uit de Azure Storage Blob-containers. - RBAC biedt een benadering voor grof beheer op containerniveau.
- Als u de
- Toegangsbeheerlijsten (ACL)
- Met ACL-benadering kunt u nauwkeurige besturingselementen toepassen op specifieke paden en/of bestanden onder een bepaalde map.
- ACL-controles worden niet afgedwongen als de gebruiker al machtigingen krijgt met behulp van de RBAC-benadering.
- Er zijn twee algemene typen ACL-machtigingen:
- Toegangsmachtigingen (toegepast op een specifiek niveau of object).
- Standaardmachtigingen (automatisch toegepast op alle onderliggende objecten op het moment dat ze zijn gemaakt).
- Het type machtigingen zijn onder andere:
Execute
maakt het mogelijk om door de mappenhiërarchieën te navigeren of te navigeren.Read
maakt het mogelijk om te lezen.Write
maakt het mogelijk om te schrijven.
- Het is belangrijk om ACL's te configureren, zodat de Verbinding maken or kan schrijven en lezen vanaf de opslaglocaties.
Notitie
Als u notebooks wilt uitvoeren met behulp van Synapse Workspace-pijplijnen, moet u ook hierboven vermelde toegangsmachtigingen verlenen voor de standaard beheerde identiteit van de Synapse-werkruimte. De standaardnaam van de beheerde identiteit van de werkruimte is hetzelfde als de naam van de werkruimte.
Als u de Synapse-werkruimte wilt gebruiken met beveiligde opslagaccounts, moet een beheerd privé-eindpunt worden geconfigureerd vanuit het notebook. Het beheerde privé-eindpunt moet worden goedgekeurd vanuit de sectie van
Private endpoint connections
het ADLS Gen2-opslagaccount in hetNetworking
deelvenster.
Toegewezen SQL-pool van Azure Synapse
Als u een succesvolle interactie met een toegewezen SQL-pool van Azure Synapse wilt inschakelen, is de volgende autorisatie nodig, tenzij u een gebruiker bent die ook is geconfigureerd als een Active Directory Admin
toegewezen SQL-eindpunt:
Scenario lezen
Verdeel de gebruiker
db_exporter
met behulp van de door het systeem opgeslagen proceduresp_addrolemember
.EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
Scenario schrijven
- Verbinding maken or de opdracht COPY gebruikt om gegevens te schrijven van fasering naar de beheerde locatie van de interne tabel.
Configureer de vereiste machtigingen die hier worden beschreven.
Hier volgt een snel toegangsfragment van hetzelfde:
--Make sure your user has the permissions to CREATE tables in the [dbo] schema GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com]; GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com]; --Make sure your user has INSERT permissions on the target table GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
- Verbinding maken or de opdracht COPY gebruikt om gegevens te schrijven van fasering naar de beheerde locatie van de interne tabel.
API-documentatie
Azure Synapse Dedicated SQL-pool Verbinding maken or voor Documentatie voor Apache Spark - API.
Configuratieopties
Als u de lees- of schrijfbewerking wilt opstarten en organiseren, verwacht de Verbinding maken or bepaalde configuratieparameters. De objectdefinitie: com.microsoft.spark.sqlanalytics.utils.Constants
biedt een lijst met gestandaardiseerde constanten voor elke parametersleutel.
Hieronder volgt de lijst met configuratieopties op basis van gebruiksscenario:
- Lezen met verificatie op basis van Microsoft Entra ID
- Referenties zijn automatisch toegewezen en de gebruiker hoeft geen specifieke configuratieopties te bieden.
- Argument voor tabelnaam in drie delen voor
synapsesql
de methode is vereist om te lezen uit de respectieve tabel in de Toegewezen SQL-pool van Azure Synapse.
- Lezen met basisverificatie
- Toegewezen SQL-eindpunt van Azure Synapse
Constants.SERVER
- Eindpunt van toegewezen SQL-pool synapse (server-FQDN)Constants.USER
- SQL-gebruikersnaam.Constants.PASSWORD
- SQL-gebruikerswachtwoord.
- Eindpunt van Azure Data Lake Storage (Gen 2) - Faseringsmappen
Constants.DATA_SOURCE
- Het opslagpad dat is ingesteld op de locatieparameter van de gegevensbron wordt gebruikt voor het faseren van gegevens.
- Toegewezen SQL-eindpunt van Azure Synapse
- Schrijven met verificatie op basis van Microsoft Entra ID
- Toegewezen SQL-eindpunt van Azure Synapse
- Standaard wordt met de Verbinding maken or het Synapse Dedicated SQL-eindpunt afgeleid met behulp van de databasenaam die is ingesteld op de parameter voor de driedelige tabelnaam van de
synapsesql
methode. - Gebruikers kunnen ook de
Constants.SERVER
optie gebruiken om het SQL-eindpunt op te geven. Zorg ervoor dat het eindpunt als host fungeert voor de bijbehorende database met het respectieve schema.
- Standaard wordt met de Verbinding maken or het Synapse Dedicated SQL-eindpunt afgeleid met behulp van de databasenaam die is ingesteld op de parameter voor de driedelige tabelnaam van de
- Eindpunt van Azure Data Lake Storage (Gen 2) - Faseringsmappen
- Voor intern tabeltype:
Constants.TEMP_FOLDER
Configureer een van beide ofConstants.DATA_SOURCE
opties.- Als de gebruiker ervoor heeft gekozen om een optie op te geven
Constants.DATA_SOURCE
, wordt de faseringsmap afgeleid met behulp van delocation
waarde uit de DataSource. - Als beide zijn opgegeven, wordt de
Constants.TEMP_FOLDER
optiewaarde gebruikt. - Als er geen optie voor een faseringsmap is, leidt de Verbinding maken or er een af op basis van de runtimeconfiguratie -
spark.sqlanalyticsconnector.stagingdir.prefix
.
- Voor extern tabeltype:
Constants.DATA_SOURCE
is een vereiste configuratieoptie.- De connector maakt gebruik van het opslagpad dat is ingesteld op de locatieparameter van de gegevensbron in combinatie met het
location
argument voor desynapsesql
methode en leidt het absolute pad af om externe tabelgegevens op te slaan. - Als het argument voor
synapsesql
delocation
methode niet is opgegeven, wordt de locatiewaarde door de connector afgeleid als<base_path>/dbName/schemaName/tableName
.
- Voor intern tabeltype:
- Toegewezen SQL-eindpunt van Azure Synapse
- Schrijven met basisverificatie
- Toegewezen SQL-eindpunt van Azure Synapse
Constants.SERVER
- Synapse Toegewezen SQL-pooleindpunt (server-FQDN).Constants.USER
- SQL-gebruikersnaam.Constants.PASSWORD
- SQL-gebruikerswachtwoord.Constants.STAGING_STORAGE_ACCOUNT_KEY
gekoppeld aan opslagaccount dat als host fungeertConstants.TEMP_FOLDERS
(alleen interne tabeltypen) ofConstants.DATA_SOURCE
.
- Eindpunt van Azure Data Lake Storage (Gen 2) - Faseringsmappen
- Sql-basisverificatiereferenties zijn niet van toepassing op toegang tot opslageindpunten.
- Zorg er daarom voor dat u relevante machtigingen voor opslagtoegang toewijst, zoals beschreven in de sectie Azure Data Lake Storage Gen2.
- Toegewezen SQL-eindpunt van Azure Synapse
Codesjablonen
In deze sectie vindt u referentiecodesjablonen om te beschrijven hoe u de toegewezen SQL-pool van Azure Synapse gebruikt en aanroept Verbinding maken or voor Apache Spark.
Notitie
De Verbinding maken or gebruiken in Python-
- De connector wordt alleen ondersteund in Python voor Spark 3. Voor Spark 2.4 (niet-ondersteund) kunnen we de Scala-connector-API gebruiken om te communiceren met inhoud van een DataFrame in PySpark met behulp van DataFrame.createOrReplaceTempView of DataFrame.createOrReplaceGlobalTempView. Zie Sectie: gerealiseerde gegevens in cellen gebruiken.
- De terugbelgreep is niet beschikbaar in Python.
Lezen uit de toegewezen SQL-pool van Azure Synapse
Aanvraag lezen - synapsesql
methodehandtekening
Lezen uit een tabel met verificatie op basis van Microsoft Entra-id
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
Lezen uit een query met behulp van verificatie op basis van Microsoft Entra ID
Notitie
Beperkingen tijdens het lezen van query's:
- Tabelnaam en query kunnen niet tegelijkertijd worden opgegeven.
- Alleen selectiequery's zijn toegestaan. DDL- en DML-MCL's zijn niet toegestaan.
- De opties voor selecteren en filteren in het dataframe worden niet naar de toegewezen SQL-pool gepusht wanneer een query wordt opgegeven.
- Lezen uit een query is alleen beschikbaar in Spark 3.1 en 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>")
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Defaults to storage path defined in the runtime configurations
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
//query from which data will be read
.synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
Lezen uit een tabel met basisverificatie
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the table will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Three-part table name from where data will be read.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Column-pruning i.e., query select column values.
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
filter(col("Title").startsWith("E")).
//Fetch a sample of 10 records
limit(10)
//Show contents of the dataframe
dfToReadFromTable.show()
Lezen uit een query met basisverificatie
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")
// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
synapsesql()
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
//Name of the SQL Dedicated Pool or database where to run the query
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
option(Constants.DATABASE, "<database_name>").
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
//Set database user name
option(Constants.USER, "<user_name>").
//Set user's password to the database
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
option(Constants.DATA_SOURCE, "<data_source_name>").
//Query where data will be read.
synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()
Schrijven naar toegewezen SQL-pool van Azure Synapse
Schrijfaanvraag - synapsesql
methodehandtekening
De methodehandtekening voor de Verbinding maken orversie die is gebouwd voor Spark 2.4.8 heeft één minder argument dan die is toegepast op de Versie van Spark 3.1.2. Hieronder ziet u de twee methodehandtekeningen:
- Spark-pool versie 2.4.8
synapsesql(tableName:String,
tableType:String = Constants.INTERNAL,
location:Option[String] = None):Unit
- Spark-pool versie 3.1.2
synapsesql(tableName:String,
tableType:String = Constants.INTERNAL,
location:Option[String] = None,
callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit
Schrijven met verificatie op basis van Microsoft Entra ID
Hieronder vindt u een uitgebreide codesjabloon waarin wordt beschreven hoe u de Verbinding maken or gebruikt voor schrijfscenario's:
//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"
//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")
//Initialize DataFrame that reads CSV data from a given source
val readDF:DataFrame=spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(1000) //Reads first 1000 rows from the source CSV input.
//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
// 1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
// 2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab.
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
(feedback: Map[String, Any], errorState: Option[Throwable]) => {
println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
errorDuringWrite = errorState
}
//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
write.
//Configure required configurations.
options(writeOptionsWithAADAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
location = None,
//Optional parameter to receive a callback.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get
Schrijven met basisverificatie
Met het volgende codefragment wordt de schrijfdefinitie vervangen die wordt beschreven in de sectie Schrijven met behulp van verificatie op basis van Microsoft Entra ID, om schrijfaanvragen in te dienen met behulp van de basisverificatiemethode van SQL:
//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
//Set database user name
Constants.USER -> "<user_name>",
//Set database user's password
Constants.PASSWORD -> "<user_password>",
//Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
//To be used only when writing to internal tables. Storage path will be used for data staging.
Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")
//Configure and submit the request to write to Synapse Dedicated SQL Pool.
readDF.
write.
options(writeOptionsWithBasicAuth).
//Choose a save mode that is apt for your use case.
mode(SaveMode.Overwrite).
synapsesql(tableName = "<database_name>.<schema_name>.<table_name>",
//For external table type value is Constants.EXTERNAL
tableType = Constants.INTERNAL,
//Not required for writing to an internal table
location = None,
//Optional parameter.
callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))
Voor een basisverificatiemethode zijn andere configuratieopties vereist om gegevens uit een bronopslagpad te lezen. Het volgende codefragment biedt een voorbeeld om te lezen uit een Azure Data Lake Storage Gen2-gegevensbron met behulp van service-principalreferenties:
//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
"delimiter"->",",
"fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" ->
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id" -> s"$spnClientId",
"fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
"fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
"fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
"fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
read.
options(dfReadOptions).
csv(pathToInputSource).
limit(100)
Ondersteunde dataframe-opslagmodi
De volgende opslagmodi worden ondersteund bij het schrijven van brongegevens naar een doeltabel in een toegewezen SQL-pool van Azure Synapse:
- ErrorIfExists (standaard opslaanmodus)
- Als de doeltabel bestaat, wordt de schrijfbewerking afgebroken met een uitzondering die wordt geretourneerd naar de aangeroepene. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.
- Negeren
- Als de doeltabel bestaat, negeert de schrijfbewerking de schrijfaanvraag zonder een fout te retourneren. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.
- Overschrijven
- Als de doeltabel bestaat, worden bestaande gegevens in de bestemming vervangen door gegevens uit de faseringsmappen. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.
- Toevoegen
- Als de doeltabel bestaat, worden de nieuwe gegevens eraan toegevoegd. Anders wordt er een nieuwe tabel gemaakt met gegevens uit de faseringsmappen.
Callback-handle voor write-aanvragen
Met de nieuwe api voor schrijfpadwijzigingen is een experimentele functie geïntroduceerd om de client een sleutelwaardetoewijzing> van metrische gegevens na schrijven te bieden. Sleutels voor de metrische gegevens worden gedefinieerd in de nieuwe objectdefinitie - Constants.FeedbackConstants
. Metrische gegevens kunnen worden opgehaald als een JSON-tekenreeks door te geven in de callback-handle (a Scala Function
). Hier volgt de functiehandtekening:
//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit
Hieronder volgen enkele belangrijke metrische gegevens (gepresenteerd in kameel geval):
WriteFailureCause
DataStagingSparkJobDurationInMilliseconds
NumberOfRecordsStagedForSQLCommit
SQLStatementExecutionDurationInMilliseconds
rows_processed
Hier volgt een voorbeeld van een JSON-tekenreeks met metrische gegevens na schrijven:
{
SparkApplicationId -> <spark_yarn_application_id>,
SQLStatementExecutionDurationInMilliseconds -> 10113,
WriteRequestReceivedAtEPOCH -> 1647523790633,
WriteRequestProcessedAtEPOCH -> 1647523808379,
StagingDataFileSystemCheckDurationInMilliseconds -> 60,
command -> "COPY INTO [schema_name].[table_name] ...",
NumberOfRecordsStagedForSQLCommit -> 100,
DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
DataStagingSparkJobDurationInMilliseconds -> 5252,
rows_processed -> 100,
SaveModeApplied -> TRUNCATE_COPY,
DurationInMillisecondsToValidateFileFormat -> 75,
status -> Completed,
SparkApplicationName -> <spark_application_name>,
ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
JDBCConfigurationsSetupAtEPOCH -> 193,
StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
SchemaInferenceCheckDurationInMilliseconds -> 91,
SaveModeRequested -> Overwrite,
DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}
Meer codevoorbeelden
Gerealiseerde gegevens in cellen gebruiken
Spark DataFrame's createOrReplaceTempView
kunnen worden gebruikt voor toegang tot gegevens die zijn opgehaald in een andere cel door een tijdelijke weergave te registreren.
- Cel waarin gegevens worden opgehaald (bijvoorbeeld met voorkeur voor notebooktaal)
Scala
//Necessary imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
//Configure options and read from Synapse Dedicated SQL Pool.
val readDF = spark.read.
//Set Synapse Dedicated SQL End Point name.
option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
//Set database user name.
option(Constants.USER, "<user_name>").
//Set database user's password.
option(Constants.PASSWORD, "<user_password>").
//Set name of the data source definition that is defined with database scoped credentials.
option(Constants.DATA_SOURCE,"<data_source_name>").
//Set the three-part table name from which the read must be performed.
synapsesql("<database_name>.<schema_name>.<table_name>").
//Optional - specify number of records the DataFrame would read.
limit(10)
//Register the temporary view (scope - current active Spark Session)
readDF.createOrReplaceTempView("<temporary_view_name>")
- Wijzig nu de taalvoorkeur in het notitieblok in
PySpark (Python)
en haal gegevens op uit de geregistreerde weergave<temporary_view_name>
spark.sql("select * from <temporary_view_name>").show()
Verwerking van antwoorden
Aanroepen synapsesql
heeft twee mogelijke eindstatussen: geslaagd of mislukt. In deze sectie wordt beschreven hoe u het aanvraagantwoord voor elk scenario kunt verwerken.
Antwoord op aanvraag lezen
Na voltooiing wordt het leesantwoordfragment weergegeven in de uitvoer van de cel. Als de huidige cel niet werkt, worden ook volgende celuitvoeringen geannuleerd. Gedetailleerde foutinformatie is beschikbaar in de Spark-toepassingslogboeken.
Antwoord van schrijfaanvraag
Standaard wordt een schrijfantwoord afgedrukt naar de uitvoer van de cel. Bij een fout wordt de huidige cel gemarkeerd als mislukt en worden de volgende celuitvoeringen afgebroken. De andere methode is het doorgeven van de callback-handleoptie aan de synapsesql
methode. De callback-ingang biedt programmatische toegang tot het schrijfantwoord.
Overige overwegingen
- Wanneer u leest vanuit de tabellen met toegewezen SQL-pool van Azure Synapse:
- Overweeg om de benodigde filters toe te passen op het DataFrame om te profiteren van de functie voor kolomsnoeien van de Verbinding maken or.
- Leesscenario biedt geen ondersteuning voor de
TOP(n-rows)
component bij het inlijsten van deSELECT
query-instructies. De keuze om gegevens te beperken, is door de component limit(.) van het DataFrame te gebruiken.- Raadpleeg het voorbeeld: gerealiseerde gegevens gebruiken in de sectie cellen .
- Wanneer u naar de toegewezen SQL-pooltabellen van Azure Synapse schrijft:
- Voor interne tabeltypen:
- Tabellen worden gemaakt met ROUND_ROBIN gegevensdistributie.
- Kolomtypen worden afgeleid uit het DataFrame dat gegevens uit de bron zou lezen. Tekenreekskolommen worden toegewezen aan
NVARCHAR(4000)
.
- Voor externe tabeltypen:
- De initiële parallelle uitvoering van DataFrame bepaalt de gegevensorganisatie voor de externe tabel.
- Kolomtypen worden afgeleid uit het DataFrame dat gegevens uit de bron zou lezen.
- Een betere gegevensdistributie tussen uitvoerders kan worden bereikt door de parameter van het
spark.sql.files.maxPartitionBytes
DataFrame en de parameter vanrepartition
het DataFrame af te stemmen. - Bij het schrijven van grote gegevenssets is het belangrijk om rekening te houden met de impact van dwU-prestatieniveau dat de transactiegrootte beperkt.
- Voor interne tabeltypen:
- Bewaak azure Data Lake Storage Gen2-gebruikstrends om beperkingsgedrag te herkennen dat invloed kan hebben op de lees- en schrijfprestaties.