इसके माध्यम से साझा किया गया


Query data in Azure Synapse Analytics

You can access Azure Synapse from Azure Databricks using the Azure Synapse connector, which uses the COPY statement in Azure Synapse to transfer large volumes of data efficiently between an Azure Databricks cluster and an Azure Synapse instance using an Azure Data Lake Storage Gen2 storage account for temporary staging.

Important

The configurations described in this article are Experimental. Experimental features are provided as-is and are not supported by Databricks through customer technical support. To get full query federation support, you should instead use Lakehouse Federation, which enables your Azure Databricks users to take advantage of Unity Catalog syntax and data governance tools.

Azure Synapse Analytics is a cloud-based enterprise data warehouse that leverages massively parallel processing (MPP) to quickly run complex queries across petabytes of data.

Important

This connector is for use with Synapse Dedicated Pool instances only and is not compatible with other Synapse components.

Note

COPY is available only on Azure Data Lake Storage Gen2 instances. If you’re looking for details on working with Polybase, see Connecting Azure Databricks and Azure Synapse with PolyBase (legacy).

Example syntax for Synapse

You can query Synapse in Scala, Python, SQL, and R. The following code examples use storage account keys and forward the storage credentials from Azure Databricks to Synapse.

Note

Use the connection string provided by Azure portal, which enables Secure Sockets Layer (SSL) encryption for all data sent between the Spark driver and the Azure Synapse instance through the JDBC connection. To verify that the SSL encryption is enabled, you can search for encrypt=true in the connection string.

Important

External locations defined in Unity Catalog are not supported as tempDir locations.

Scala


// Set up the storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
val df: DataFrame = spark.read
  .format("sqldw")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 1433 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* If schemaName not provided, default to "dbo". */
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .load()

// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .load()

// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select x, count(*) as cnt from table group by x")
  .load()

// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.

df.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .save()

Python


# Set up the storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
df = spark.read
  .format("sqldw")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 1433 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # If schemaName not provided, default to "dbo".
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .load()

# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .load()

# Load data from an Azure Synapse query.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", "select x, count(*) as cnt from table group by x") \
  .load()

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.

df.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .save()

SQL


-- Set up the storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;

-- Read data using SQL. The following example applies to Databricks Runtime 11.3 LTS and above.
CREATE TABLE example_table_in_spark_read
USING sqldw
OPTIONS (
  host '<hostname>',
  port '<port>' /* Optional - will use default port 1433 if not specified. */
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* If schemaName not provided, default to "dbo". */
  forwardSparkAzureStorageCredentials 'true',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);

-- Read data using SQL. The following example applies to Databricks Runtime 10.4 LTS and below.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbtable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);

-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:

CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;

R

# Load SparkR
library(SparkR)

# Set up the storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Load data from an Azure Synapse query.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   query = "select x, count(*) as cnt from table group by x",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.

write.df(
  df,
  source = "com.databricks.spark.sqldw",
  url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
  forward_spark_azure_storage_credentials = "true",
  dbTable = "<your-table-name>",
  tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

How does authentication between Azure Databricks and Synapse work?

The Azure Synapse connector uses three types of network connections:

  • Spark driver to Azure Synapse
  • Spark cluster to Azure storage account
  • Azure Synapse to Azure storage account

Configuring access to Azure storage

Both Azure Databricks and Synapse need privileged access to an Azure storage account to be used for temporary data storage.

Azure Synapse does not support using SAS for storage account access. You can configure access for both services by doing one of the following:

Required Azure Synapse permissions

Because it uses COPY in the background, the Azure Synapse connector requires the JDBC connection user to have permission to run the following commands in the connected Azure Synapse instance:

If the destination table does not exist in Azure Synapse, permission to run the following command is required in addition to the command above:

The following table summarizes the permissions required for writes with COPY:

Permissions (insert into an existing table) Permissions (insert into a new table)
ADMINISTER DATABASE BULK OPERATIONS

INSERT
ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo

Networking configurations

If you configure a firewall on Azure Synapse, you must configure network settings to allow Azure Databricks to reach Azure Synapse. First, ensure that your Azure Databricks workspace is deployed in your own virtual network following Deploy Azure Databricks in your Azure virtual network (VNet injection). You can then configure IP firewall rules on Azure Synpase to allow connections from your subnets to your Synapse account. See Azure Synapse Analytics IP firewall rules.

Configure connection from Azure Databricks to Synapse with OAuth 2.0 with a service principal

You can authenticate to Azure Synapse Analytics using a service principal with access to the underlying storage account. For more information on using service principal credentials to access an Azure storage account, see Connect to Azure Data Lake Storage Gen2 and Blob Storage. You must set the enableServicePrincipalAuth option to true in the connection configuration Azure Databricks Synapse connector options reference to enable the connector to authenticate with a service principal.

You can optionally use a different service principal for the Azure Synapse Analytics connection. The following example configures service principal credentials for the storage account and optional service principal credentials for Synapse:

ini

; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token

; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>

Scala

// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

Python

# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

R

# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")

# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

Supported save modes for batch writes

The Azure Synapse connector supports ErrorIfExists, Ignore, Append, and Overwrite save modes with the default mode being ErrorIfExists. For more information on supported save modes in Apache Spark, see Spark SQL documentation on Save Modes.

Azure Databricks Synapse connector options reference

The OPTIONS provided in Spark SQL support the following settings:

Parameter Required Default Notes
dbTable Yes, unless query is specified No default The table to create or read from in Azure Synapse. This parameter is required when saving data back to Azure Synapse.

You can also use {SCHEMA NAME}.{TABLE NAME} to access a table in a given schema. If schema name is not provided, the default schema associated with the JDBC user is used.

The previously supported dbtable variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.
query Yes, unless dbTable is specified No default The query to read from in Azure Synapse.

For tables referred in the query, you can also use {SCHEMA NAME}.{TABLE NAME} to access a table in a given schema. If schema name is not provided, the default schema associated with the JDBC user is used.
user No No default The Azure Synapse username. Must be used in tandem with password option. Can only be used if the user and password are not passed in the URL. Passing both will result in an error.
password No No default The Azure Synapse password. Must be used in tandem with user option. Can only be used if the user and password are not passed in the URL. Passing both will result in an error.
url Yes No default A JDBC URL with sqlserver set as the subprotocol. It is recommended to use the connection string provided by Azure portal. Setting
encrypt=true is strongly recommended, because it enables SSL encryption of the JDBC connection. If user and password are set separately, you do not need to include them in the URL.
jdbcDriver No Determined by the JDBC URL’s subprotocol The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL’s subprotocol.

The previously supported jdbc_driver variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.
tempDir Yes No default A abfss URI. We recommend you use a dedicated Blob storage container for the Azure Synapse.

The previously supported tempdir variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.

You cannot use an External location defined in Unity Catalog as a tempDir location.
tempCompression No SNAPPY The compression algorithm to be used to encode/decode temporary by both Spark and Azure Synapse. Currently supported values are: UNCOMPRESSED, SNAPPY and GZIP.
forwardSparkAzureStorageCredentials No false If true, the library automatically discovers the storage account access key credentials that Spark is using to connect to the Blob storage container and forwards those credentials to Azure Synapse over JDBC. These credentials are sent as part of the JDBC query. Therefore it is strongly recommended that you enable SSL encryption of the JDBC connection when you use this option.

When configuring storage authentication, you must set exactly one of useAzureMSI and forwardSparkAzureStorageCredentials to true. Alternatively, you can set enableServicePrincipalAuth to true and use service principal for both JDBC and storage authentication. The forwardSparkAzureStorageCredentials option does not support authentication to storage using either a managed service identity or service principal. Only storage account access key is supported.

The previously supported forward_spark_azure_storage_credentials variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.
useAzureMSI No false If true, the library will specify IDENTITY = 'Managed Service Identity' and no SECRET for the database scoped credentials it creates.

When configuring storage authentication, you must set exactly one of useAzureMSI and forwardSparkAzureStorageCredentials to true. Alternatively, you can set enableServicePrincipalAuth to true and use service principal for both JDBC and storage authentication.
enableServicePrincipalAuth No false If true, the library will use the provided service principal credentials to connect to the Azure storage account and Azure Synapse Analytics over JDBC.

If either forward_spark_azure_storage_credentials or useAzureMSI is set to true, that option would take precedence over service principal in storage authentication.
tableOptions No CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN A string used to specify table options when creating the Azure Synapse table set through dbTable. This string is passed literally to the WITH clause of the CREATE TABLE SQL statement that is issued against Azure Synapse.

The previously supported table_options variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.
preActions No No default (empty string) A ; separated list of SQL commands to be executed in Azure Synapse before writing data to the Azure Synapse instance. These SQL commands are required to be valid commands accepted by Azure Synapse.

If any of these commands fail, it is treated as an error and the write operation is not executed.
postActions No No default (empty string) A ; separated list of SQL commands to be executed in Azure Synapse after the connector successfully writes data to the Azure Synapse instance. These SQL commands are required to be valid commands accepted by Azure Synapse.

If any of these commands fail, it is treated as an error and you’ll get an exception after the data is successfully written to the Azure Synapse instance.
maxStrLength No 256 StringType in Spark is mapped to the NVARCHAR(maxStrLength) type in Azure Synapse. You can use maxStrLength to set the string length for all NVARCHAR(maxStrLength) type columns that are in the table with name
dbTable in Azure Synapse.

The previously supported maxstrlength variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.
applicationName No Databricks-User-Query The tag of the connection for each query. If not specified or the value is an empty string, the default value of the tag is added the JDBC URL. The default value prevents the Azure DB Monitoring tool from raising spurious SQL injection alerts against queries.
maxbinlength No No default Control the column length of BinaryType columns. This parameter is translated as VARBINARY(maxbinlength).
identityInsert No false Setting to true enables IDENTITY_INSERT mode, which inserts a DataFrame provided value in the identity column of the Azure Synapse table.

See Explicitly inserting values into an IDENTITY column.
externalDataSource No No default A pre-provisioned external data source to read data from Azure Synapse. An external data source can only be used with PolyBase and removes the CONTROL permission requirement because the connector does not need to create a scoped credential and an external data source to load data.

For example usage and the list of permissions required when using an external data source, see Required Azure Synapse permissions for PolyBase with the external data source option.
maxErrors No 0 The maximum number of rows that can be rejected during reads and writes before the loading operation is cancelled. The rejected rows will be ignored. For example, if two out of ten records have errors, only eight records will be processed.

See REJECT_VALUE documentation in CREATE EXTERNAL TABLE and MAXERRORS documentation in COPY.
inferTimestampNTZType No false If true, values of type Azure Synapse TIMESTAMP are interpreted as TimestampNTZType (timestamp without time zone) during reads. Otherwise, all timestamps are interpreted as TimestampType regardless of the type in the underlying Azure Synapse table.

Note

  • tableOptions, preActions, postActions, and maxStrLength are relevant only when writing data from Azure Databricks to a new table in Azure Synapse.
  • Even though all data source option names are case-insensitive, we recommend that you specify them in “camel case” for clarity.

Query pushdown into Azure Synapse

The Azure Synapse connector implements a set of optimization rules to push the following operators down into Azure Synapse:

  • Filter
  • Project
  • Limit

The Project and Filter operators support the following expressions:

  • Most boolean logic operators
  • Comparisons
  • Basic arithmetic operations
  • Numeric and string casts

For the Limit operator, pushdown is supported only when there is no ordering specified. For example:

SELECT TOP(10) * FROM table, but not SELECT TOP(10) * FROM table ORDER BY col.

Note

The Azure Synapse connector does not push down expressions operating on strings, dates, or timestamps.

Query pushdown built with the Azure Synapse connector is enabled by default. You can disable it by setting spark.databricks.sqldw.pushdown to false.

Temporary data management

The Azure Synapse connector does not delete the temporary files that it creates in the Azure storage container. Databricks recommends that you periodically delete temporary files under the user-supplied tempDir location.

To facilitate data cleanup, the Azure Synapse connector does not store data files directly under tempDir, but instead creates a subdirectory of the form: <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/. You can set up periodic jobs (using the Azure Databricks jobs feature or otherwise) to recursively delete any subdirectories that are older than a given threshold (for example, 2 days), with the assumption that there cannot be Spark jobs running longer than that threshold.

A simpler alternative is to periodically drop the whole container and create a new one with the same name. This requires that you use a dedicated container for the temporary data produced by the Azure Synapse connector and that you can find a time window in which you can guarantee that no queries involving the connector are running.

Temporary object management

The Azure Synapse connector automates data transfer between an Azure Databricks cluster and an Azure Synapse instance. For reading data from an Azure Synapse table or query or writing data to an Azure Synapse table, the Azure Synapse connector creates temporary objects, including DATABASE SCOPED CREDENTIAL, EXTERNAL DATA SOURCE, EXTERNAL FILE FORMAT, and EXTERNAL TABLE behind the scenes. These objects live only throughout the duration of the corresponding Spark job and are automatically dropped.

When a cluster is running a query using the Azure Synapse connector, if the Spark driver process crashes or is forcefully restarted, or if the cluster is forcefully terminated or restarted, temporary objects might not be dropped. To facilitate identification and manual deletion of these objects, the Azure Synapse connector prefixes the names of all intermediate temporary objects created in the Azure Synapse instance with a tag of the form: tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>.

We recommend that you periodically look for leaked objects using queries such as the following:

  • SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'