Databricks Connect
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Databricks Connect allows you to connect your favorite IDE (Eclipse, IntelliJ, PyCharm, RStudio, Visual Studio Code), notebook server (Jupyter Notebook, Zeppelin), and other custom applications to Azure Databricks clusters.
This article explains how Databricks Connect works, walks you through the steps to get started with Databricks Connect, explains how to troubleshoot issues that may arise when using Databricks Connect, and differences between running using Databricks Connect versus running in an Azure Databricks notebook.
Overview
Databricks Connect is a client library for Databricks Runtime. It allows you to write jobs using Spark APIs and run them remotely on an Azure Databricks cluster instead of in the local Spark session.
For example, when you run the DataFrame command spark.read.format("parquet").load(...).groupBy(...).agg(...).show()
using Databricks Connect, the parsing and planning of the job runs on your local machine. Then, the logical representation of the job is sent to the Spark server running in Azure Databricks for execution in the cluster.
With Databricks Connect, you can:
- Run large-scale Spark jobs from any Python, Java, Scala, or R application. Anywhere you can
import pyspark
,import org.apache.spark
, orrequire(SparkR)
, you can now run Spark jobs directly from your application, without needing to install any IDE plugins or use Spark submission scripts. - Step through and debug code in your IDE even when working with a remote cluster.
- Iterate quickly when developing libraries. You do not need to restart the cluster after changing Python or Java library dependencies in Databricks Connect, because each client session is isolated from each other in the cluster.
- Shut down idle clusters without losing work. Because the client application is decoupled from the cluster, it is unaffected by cluster restarts or upgrades, which would normally cause you to lose all the variables, RDDs, and DataFrame objects defined in a notebook.
Note
For Python development with SQL queries, Databricks recommends that you use the Databricks SQL Connector for Python instead of Databricks Connect. the Databricks SQL Connector for Python is easier to set up than Databricks Connect. Also, Databricks Connect parses and plans jobs runs on your local machine, while jobs run on remote compute resources. This can make it especially difficult to debug runtime errors. The Databricks SQL Connector for Python submits SQL queries directly to remote compute resources and fetches results.
Requirements
Only the following Databricks Runtime versions are supported:
- Databricks Runtime 11.3 LTS ML, Databricks Runtime 11.3 LTS
- Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS
- Databricks Runtime 9.1 LTS ML, Databricks Runtime 9.1 LTS
- Databricks Runtime 7.3 LTS ML, Databricks Runtime 7.3 LTS
The minor version of your client Python installation must be the same as the minor Python version of your Azure Databricks cluster. The table shows the Python version installed with each Databricks Runtime.
Databricks Runtime version Python version 11.3 LTS ML, 11.3 LTS 3.9 10.4 LTS ML, 10.4 LTS 3.8 9.1 LTS ML, 9.1 LTS 3.8 7.3 LTS ML, 7.3 LTS 3.7 For example, if you’re using Conda on your local development environment and your cluster is running Python 3.7, you must create an environment with that version, for example:
conda create --name dbconnect python=3.7 conda
The Databricks Connect major and minor package version must always match your Databricks Runtime version. Databricks recommends that you always use the most recent package of Databricks Connect that matches your Databricks Runtime version. For example, when using a Databricks Runtime 7.3 LTS cluster, use the
databricks-connect==7.3.*
package.Note
See the Databricks Connect release notes for a list of available Databricks Connect releases and maintenance updates.
Java Runtime Environment (JRE) 8. The client has been tested with the OpenJDK 8 JRE. The client does not support Java 11.
Note
On Windows, if you see an error that Databricks Connect cannot find winutils.exe
, see Cannot find winutils.exe on Windows.
Set up the client
Note
Before you begin to set up the Databricks Connect client, you must meet the requirements for Databricks Connect.
Step 1: Install the client
Uninstall PySpark. This is required because the
databricks-connect
package conflicts with PySpark. For details, see Conflicting PySpark installations.pip uninstall pyspark
Install the Databricks Connect client.
pip install -U "databricks-connect==7.3.*" # or X.Y.* to match your cluster version.
Note
Always specify
databricks-connect==X.Y.*
instead ofdatabricks-connect=X.Y
, to make sure that the newest package is installed.
Step 2: Configure connection properties
Collect the following configuration properties:
Azure Databricks workspace URL.
Azure Databricks personal access token or an Azure Active Directory token.
- For Azure Data Lake Storage (ADLS) credential passthrough, you must use an Azure Active Directory token. Azure Active Directory credential passthrough is supported only on Standard clusters running Databricks Runtime 7.3 LTS and above, and is not compatible with service principal authentication.
- For more information about authentication with Azure Active Directory tokens, see Authentication using Azure Active Directory tokens.
The ID of the cluster you created. You can obtain the cluster ID from the URL. Here the cluster ID is
1108-201635-xxxxxxxx
.The unique organization ID for your workspace. See Get identifiers for workspace assets.
The port that Databricks Connect connects to. The default port is
15001
. If your cluster is configured to use a different port, such as8787
which was given in previous instructions for Azure Databricks, use the configured port number.
Configure the connection. You can use the CLI, SQL configs, or environment variables. The precedence of configuration methods from highest to lowest is: SQL config keys, CLI, and environment variables.
- CLI
Run
databricks-connect
.databricks-connect configure
The license displays:
Copyright (2018) Databricks, Inc. This library (the "Software") may not be used except in connection with the Licensee's use of the Databricks Platform Services pursuant to an Agreement ...
Accept the license and supply configuration values. For Databricks Host and Databricks Token, enter the workspace URL and the personal access token you noted in Step 1.
Do you accept the above agreement? [y/N] y Set new config values (leave input empty to accept default): Databricks Host [no current value, must start with https://]: <databricks-url> Databricks Token [no current value]: <databricks-token> Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id> Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id> Port [15001]: <port>
If you get a message that the Azure Active Directory token is too long, you can leave the Databricks Token field empty and manually enter the token in
~/.databricks-connect
.SQL configs or environment variables. The following table shows the SQL config keys and the environment variables that correspond to the configuration properties you noted in Step 1. To set a SQL config key, use
sql("set config=value")
. For example:sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")
.Parameter SQL config key Environment variable name Databricks Host spark.databricks.service.address DATABRICKS_ADDRESS Databricks Token spark.databricks.service.token DATABRICKS_API_TOKEN Cluster ID spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID Org ID spark.databricks.service.orgId DATABRICKS_ORG_ID Port spark.databricks.service.port DATABRICKS_PORT
- CLI
Test connectivity to Azure Databricks.
databricks-connect test
If the cluster you configured is not running, the test starts the cluster which will remain running until its configured autotermination time. The output should be something like:
* PySpark is installed at /.../3.5.6/lib/python3.5/site-packages/pyspark * Checking java version java version "1.8.0_152" Java(TM) SE Runtime Environment (build 1.8.0_152-b16) Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) * Testing scala command 18/12/10 16:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:38:50 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://10.8.5.214:4040 Spark context available as 'sc' (master = local[*], app id = local-1544488730553). Spark session available as 'spark'. View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
Set up your IDE or notebook server
The section describes how to configure your preferred IDE or notebook server to use the Databricks Connect client.
In this section:
- Jupyter notebook
- PyCharm
- SparkR and RStudio Desktop
- sparklyr and RStudio Desktop
- IntelliJ (Scala or Java)
- Eclipse
- Visual Studio Code
- SBT
Jupyter notebook
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
The Databricks Connect configuration script automatically adds the package to your project configuration. To get started in a Python kernel, run:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
To enable the %sql
shorthand for running and visualizing SQL queries, use the following snippet:
from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class
@magics_class
class DatabricksConnectMagics(Magics):
@line_cell_magic
def sql(self, line, cell=None):
if cell and line:
raise ValueError("Line must be empty for cell magic", line)
try:
from autovizwidget.widget.utils import display_dataframe
except ImportError:
print("Please run `pip install autovizwidget` to enable the visualization widget.")
display_dataframe = lambda x: x
return display_dataframe(self.get_spark().sql(cell or line).toPandas())
def get_spark(self):
user_ns = get_ipython().user_ns
if "spark" in user_ns:
return user_ns["spark"]
else:
from pyspark.sql import SparkSession
user_ns["spark"] = SparkSession.builder.getOrCreate()
return user_ns["spark"]
ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)
PyCharm
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
The Databricks Connect configuration script automatically adds the package to your project configuration.
Python 3 clusters
When you create a PyCharm project, select Existing Interpreter. From the drop-down menu, select the Conda environment you created (see Requirements).
Go to Run > Edit Configurations.
Add
PYSPARK_PYTHON=python3
as an environment variable.
SparkR and RStudio Desktop
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
Download and unpack the open source Spark onto your local machine. Choose the same version as in your Azure Databricks cluster (Hadoop 2.7).
Run
databricks-connect get-jar-dir
. This command returns a path like/usr/local/lib/python3.5/dist-packages/pyspark/jars
. Copy the file path of one directory above the JAR directory file path, for example,/usr/local/lib/python3.5/dist-packages/pyspark
, which is theSPARK_HOME
directory.Configure the Spark lib path and Spark home by adding them to the top of your R script. Set
<spark-lib-path>
to the directory where you unpacked the open source Spark package in step 1. Set<spark-home-path>
to the Databricks Connect directory from step 2.# Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7 library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths()))) # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark Sys.setenv(SPARK_HOME = "<spark-home-path>")
Initiate a Spark session and start running SparkR commands.
sparkR.session() df <- as.DataFrame(faithful) head(df) df1 <- dapply(df, function(x) { x }, schema(df)) collect(df1)
sparklyr and RStudio Desktop
Important
This feature is in Public Preview.
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
You can copy sparklyr-dependent code that you’ve developed locally using Databricks Connect and run it in an Azure Databricks notebook or hosted RStudio Server in your Azure Databricks workspace with minimal or no code changes.
In this section:
- Requirements
- Install, configure, and use sparklyr
- Resources
- sparklyr and RStudio Desktop limitations
Requirements
- sparklyr 1.2 or above.
- Databricks Runtime 7.3 or above with matching Databricks Connect.
Install, configure, and use sparklyr
In RStudio Desktop, install sparklyr 1.2 or above from CRAN or install the latest master version from GitHub.
# Install from CRAN install.packages("sparklyr") # Or install the latest master version from GitHub install.packages("devtools") devtools::install_github("sparklyr/sparklyr")
Activate the Python environment with Databricks Connect installed and run the following command in the terminal to get the
<spark-home-path>
:databricks-connect get-spark-home
Initiate a Spark session and start running sparklyr commands.
library(sparklyr) sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>") iris_tbl <- copy_to(sc, iris, overwrite = TRUE) library(dplyr) src_tbls(sc) iris_tbl %>% count
Close the connection.
spark_disconnect(sc)
Resources
For more information, see the sparklyr GitHub README.
For code examples, see sparklyr.
sparklyr and RStudio Desktop limitations
The following features are unsupported:
- sparklyr streaming APIs
- sparklyr ML APIs
- broom APIs
- csv_file serialization mode
- spark submit
IntelliJ (Scala or Java)
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
Run
databricks-connect get-jar-dir
.Point the dependencies to the directory returned from the command. Go to File > Project Structure > Modules > Dependencies > ‘+’ sign > JARs or Directories.
To avoid conflicts, we strongly recommend removing any other Spark installations from your classpath. If this is not possible, make sure that the JARs you add are at the front of the classpath. In particular, they must be ahead of any other installed version of Spark (otherwise you will either use one of those other Spark versions and run locally or throw a
ClassDefNotFoundError
).Check the setting of the breakout option in IntelliJ. The default is All and will cause network timeouts if you set breakpoints for debugging. Set it to Thread to avoid stopping the background network threads.
Eclipse
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
Run
databricks-connect get-jar-dir
.Point the external JARs configuration to the directory returned from the command. Go to Project menu > Properties > Java Build Path > Libraries > Add External Jars.
To avoid conflicts, we strongly recommend removing any other Spark installations from your classpath. If this is not possible, make sure that the JARs you add are at the front of the classpath. In particular, they must be ahead of any other installed version of Spark (otherwise you will either use one of those other Spark versions and run locally or throw a
ClassDefNotFoundError
).
Visual Studio Code
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
Verify that the Python extension is installed.
Open the Command Palette (Command+Shift+P on macOS and Ctrl+Shift+P on Windows/Linux).
Select a Python interpreter. Go to Code > Preferences > Settings, and choose python settings.
Run
databricks-connect get-jar-dir
.Add the directory returned from the command to the User Settings JSON under
python.venvPath
. This should be added to the Python Configuration.Disable the linter. Click the … on the right side and edit json settings. The modified settings are as follows:
If running with a virtual environment, which is the recommended way to develop for Python in VS Code, in the Command Palette type
select python interpreter
and point to your environment that matches your cluster Python version.For example, if your cluster is Python 3.5, your local environment should be Python 3.5.
SBT
Note
Databricks recommends that you use either dbx or the Databricks extension for Visual Studio Code for local development instead of Databricks Connect.
Before you begin to use Databricks Connect, you must meet the requirements and set up the client for Databricks Connect.
To use SBT, you must configure your build.sbt
file to link against the Databricks Connect JARs instead of the usual Spark library dependency. You do this with the unmanagedBase
directive in the following example build file, which assumes a Scala app that has a com.example.Test
main object:
build.sbt
name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")
Run examples from your IDE
Java
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;
public class App {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("Temps Demo")
.config("spark.master", "local")
.getOrCreate();
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
StructType schema = new StructType(new StructField[] {
new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
});
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));
Dataset<Row> temps = spark.createDataFrame(dataList, schema);
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default");
spark.sql("DROP TABLE IF EXISTS demo_temps_table");
temps.write().saveAsTable("demo_temps_table");
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
Dataset<Row> df_temps = spark.sql("SELECT * FROM demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC");
df_temps.show();
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE demo_temps_table");
}
}
Python
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date
spark = SparkSession.builder.appName('temps-demo').getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE demo_temps_table')
Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date
object Demo {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local").getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(Array(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
))
val data = List(
Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
)
val rdd = spark.sparkContext.makeRDD(data)
val temps = spark.createDataFrame(rdd, schema)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS demo_temps_table")
temps.write.saveAsTable("demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE demo_temps_table")
}
}
Work with dependencies
Typically your main class or Python file will have other dependency JARs and files. You can add such dependency JARs and files by calling sparkContext.addJar("path-to-the-jar")
or sparkContext.addPyFile("path-to-the-file")
. You can also add Egg files and zip files with the addPyFile()
interface. Every time you run the code in your IDE, the dependency JARs and files are installed on the cluster.
Python
from lib import Foo
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
#sc.setLogLevel("INFO")
print("Testing simple count")
print(spark.range(100).count())
print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())
class Foo(object):
def __init__(self, x):
self.x = x
Python + Java UDFs
from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column
## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
# val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}
spark = SparkSession.builder \
.config("spark.jars", "/path/to/udf.jar") \
.getOrCreate()
sc = spark.sparkContext
def plus_one_udf(col):
f = sc._jvm.com.example.Test.plusOne()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()
Scala
package com.example
import org.apache.spark.sql.SparkSession
case class Foo(x: String)
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
...
.getOrCreate();
spark.sparkContext.setLogLevel("INFO")
println("Running simple show query...")
spark.read.format("parquet").load("/tmp/x").show()
println("Running simple UDF query...")
spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
spark.udf.register("f", (x: Int) => x + 1)
spark.range(10).selectExpr("f(id)").show()
println("Running custom objects query...")
val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
println(objs.toSeq)
}
}
Access DBUtils
You can use dbutils.fs
and dbutils.secrets
utilities of the Databricks Utilities module.
Supported commands are dbutils.fs.cp
, dbutils.fs.head
, dbutils.fs.ls
, dbutils.fs.mkdirs
, dbutils.fs.mv
, dbutils.fs.put
, dbutils.fs.rm
, dbutils.secrets.get
, dbutils.secrets.getBytes
, dbutils.secrets.list
, dbutils.secrets.listScopes
.
See File system utility (dbutils.fs) or run dbutils.fs.help()
and Secrets utility (dbutils.secrets) or run dbutils.secrets.help()
.
Python
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())
When using Databricks Runtime 7.3 LTS or above, to access the DBUtils module in a way that works both locally and in Azure Databricks clusters, use the following get_dbutils()
:
def get_dbutils(spark):
from pyspark.dbutils import DBUtils
return DBUtils(spark)
Otherwise, use the following get_dbutils()
:
def get_dbutils(spark):
if spark.conf.get("spark.databricks.service.client.enabled") == "true":
from pyspark.dbutils import DBUtils
return DBUtils(spark)
else:
import IPython
return IPython.get_ipython().user_ns["dbutils"]
Scala
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())
Copying files between local and remote filesystems
You can use dbutils.fs
to copy files between your client and remote filesystems. Scheme file:/
refers to the local filesystem on the client.
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')
The maximum file size that can be transferred that way is 250 MB.
Enable dbutils.secrets.get
Because of security restrictions, the ability to call dbutils.secrets.get
is disabled by default. Contact Azure Databricks support to enable this feature for your workspace.
Access the Hadoop filesystem
You can also access DBFS directly using the standard Hadoop filesystem interface:
> import org.apache.hadoop.fs._
// get new DBFS connection
> val dbfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
dbfs: org.apache.hadoop.fs.FileSystem = com.databricks.backend.daemon.data.client.DBFS@2d036335
// list files
> dbfs.listStatus(new Path("dbfs:/"))
res1: Array[org.apache.hadoop.fs.FileStatus] = Array(FileStatus{path=dbfs:/$; isDirectory=true; ...})
// open file
> val stream = dbfs.open(new Path("dbfs:/path/to/your_file"))
stream: org.apache.hadoop.fs.FSDataInputStream = org.apache.hadoop.fs.FSDataInputStream@7aa4ef24
// get file contents as string
> import org.apache.commons.io._
> println(new String(IOUtils.toByteArray(stream)))
Set Hadoop configurations
On the client you can set Hadoop configurations using the spark.conf.set
API, which applies to SQL and DataFrame operations. Hadoop configurations set on the sparkContext
must be set in the cluster configuration or using a notebook. This is because configurations set on sparkContext
are not tied to user sessions but apply to the entire cluster.
Troubleshooting
Run databricks-connect test
to check for connectivity issues. This section describes some common issues you may encounter and how to resolve them.
Python version mismatch
Check the Python version you are using locally has at least the same minor release as the version on the cluster (for example, 3.5.1
versus 3.5.2
is OK, 3.5
versus 3.6
is not).
If you have multiple Python versions installed locally, ensure that Databricks Connect is using the right one by setting the PYSPARK_PYTHON
environment variable (for example, PYSPARK_PYTHON=python3
).
Server not enabled
Ensure the cluster has the Spark server enabled with spark.databricks.service.server.enabled true
. You should see the following lines in the driver log if it is:
18/10/25 21:39:18 INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
18/10/25 21:39:21 INFO SparkContext: Loading Spark Service RPC Server
18/10/25 21:39:21 INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
18/10/25 21:39:21 INFO Server: jetty-9.3.20.v20170531
18/10/25 21:39:21 INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
18/10/25 21:39:21 INFO Server: Started @5879ms
Conflicting PySpark installations
The databricks-connect
package conflicts with PySpark. Having both installed will cause errors when initializing the Spark context in Python. This can manifest in several ways, including “stream corrupted” or “class not found” errors. If you have PySpark installed in your Python environment, ensure it is uninstalled before installing databricks-connect. After uninstalling PySpark, make sure to fully re-install the Databricks Connect package:
pip uninstall pyspark
pip uninstall databricks-connect
pip install -U "databricks-connect==9.1.*" # or X.Y.* to match your cluster version.
Conflicting SPARK_HOME
If you have previously used Spark on your machine, your IDE may be configured to use one of those other versions of Spark rather than the Databricks Connect Spark. This can manifest in several ways, including “stream corrupted” or “class not found” errors. You can see which version of Spark is being used by checking the value of the SPARK_HOME
environment variable:
Java
System.out.println(System.getenv("SPARK_HOME"));
Python
import os
print(os.environ['SPARK_HOME'])
Scala
println(sys.env.get("SPARK_HOME"))
Resolution
If SPARK_HOME
is set to a version of Spark other than the one in the client, you should unset the SPARK_HOME
variable and try again.
Check your IDE environment variable settings, your .bashrc
, .zshrc
, or .bash_profile
file, and anywhere else environment variables might be set. You will most likely have to quit and restart your IDE to purge the old state, and you may even need to create a new project if the problem persists.
You should not need to set SPARK_HOME
to a new value; unsetting it should be sufficient.
Conflicting or Missing PATH
entry for binaries
It is possible your PATH is configured so that commands like spark-shell
will be running some other previously installed binary instead of the one provided with Databricks Connect. This can cause databricks-connect test
to fail. You should make sure either the Databricks Connect binaries take precedence, or remove the previously installed ones.
If you can’t run commands like spark-shell
, it is also possible your PATH was not automatically set up by pip install
and you’ll need to add the installation bin
dir to your PATH manually. It’s possible to use Databricks Connect with IDEs even if this isn’t set up. However, the databricks-connect test
command will not work.
Conflicting serialization settings on the cluster
If you see “stream corrupted” errors when running databricks-connect test
, this may be due to incompatible cluster serialization configs. For example, setting the spark.io.compression.codec
config can cause this issue. To resolve this issue, consider removing these configs from the cluster settings, or setting the configuration in the Databricks Connect client.
Cannot find winutils.exe
on Windows
If you are using Databricks Connect on Windows and see:
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
Follow the instructions to configure the Hadoop path on Windows.
The filename, directory name, or volume label syntax is incorrect on Windows
If you are using Databricks Connect on Windows and see:
The filename, directory name, or volume label syntax is incorrect.
Either Java or Databricks Connect was installed into a directory with a space in your path. You can work around this by either installing into a directory path without spaces, or configuring your path using the short name form.
Authentication using Azure Active Directory tokens
When you use Databricks Connect, you can authenticate by using an Azure Active Directory token instead of a personal access token. Azure Active Directory tokens have a limited lifetime. When the Azure Active Directory token expires, Databricks Connect fails with an Invalid Token
error.
In Databricks Connect 7.3.5 and above, you can provide the Azure Active Directory token in your running Databricks Connect application. Your application needs to obtain the new access token, and set it to the spark.databricks.service.token
SQL config key.
Python
spark.conf.set("spark.databricks.service.token", new_aad_token)
Scala
spark.conf.set("spark.databricks.service.token", newAADToken)
After you update the token, the application can continue to use the same SparkSession
and any objects and state that are created in the context of the session. To avoid intermittent errors, Databricks recommends that you provide a new token before the old token expires.
You can extend the lifetime of the Azure Active Directory token to persist during the execution of your application. To do that, attach a TokenLifetimePolicy with an appropriately long lifetime to the Azure Active Directory authorization application that you used to acquire the access token.
Note
Azure Active Directory passthrough uses two tokens: the Azure Active Directory access token that was previously described that you configure in Databricks Connect, and the ADLS passthrough token for the specific resource that Databricks generates while Databricks processes the request. You cannot extend the lifetime of ADLS passthrough tokens by using Azure Active Directory token lifetime policies. If you send a command to the cluster that takes longer than an hour, it will fail if the command accesses an ADLS resource after the one hour mark.
Limitations
Databricks Connect does not support the following Azure Databricks features and third-party platforms:
Structured Streaming.
Running arbitrary code that is not a part of a Spark job on the remote cluster.
Native Scala, Python, and R APIs for Delta table operations (for example,
DeltaTable.forPath
) are not supported. However, the SQL API (spark.sql(...)
) with Delta Lake operations and the Spark API (for example,spark.read.load
) on Delta tables are both supported.Copy into.
Using SQL functions, Python or Scala UDFs which are part of the server’s catalog. However, locally introduced Scala and Python UDFs work.
Apache Zeppelin 0.7.x and below.
Connecting to clusters with table access control.
Connecting to clusters with process isolation enabled (in other words, where
spark.databricks.pyspark.enableProcessIsolation
is set totrue
).Delta
CLONE
SQL command.Global temporary views.
CREATE TABLE table AS SELECT ...
SQL commands do not always work. Instead, usespark.sql("SELECT ...").write.saveAsTable("table")
.Azure Active Directory credential passthrough is supported only on standard clusters running Databricks Runtime 7.3 LTS and above, and is not compatible with service principal authentication.
The following Databricks Utilities:
Feedback
Submit and view feedback for