What are user-defined functions (UDFs)?
User-defined functions (UDFs) allow you to reuse and share code that extends built-in functionality on Azure Databricks. Use UDFs to perform specific tasks, such as complex calculations, transformations, or custom data manipulations.
Note
On clusters with shared access mode, Python scalar UDFs are supported in Databricks Runtime 13.3 LTS and above, while Scala UDFs are supported in Databricks Runtime 14.2 and above.
Python scalar UDFs can be registered in Unity Catalog using SQL syntax in Databricks Runtime 13.3 LTS and above. See User-defined functions (UDFs) in Unity Catalog.
When should you use a UDF?
Use UDFs for logic that is difficult to express with built-in Apache Spark functions. Built-in Apache Spark functions are optimized for distributed processing and generally offer better performance at scale. For more information, see Functions.
Databricks recommends UDFs for ad hoc queries, manual data cleansing, exploratory data analysis, and operations on small to medium-sized datasets. Common use cases for UDFs include data encryption and decryption, hashing, JSON parsing, and validation.
Use Apache Spark methods for operations on very large datasets and any workloads that are run regularly or continuously, including ETL jobs and streaming operations.
Registered and session scoped UDFs
UDFs created using SQL are registered in Unity Catalog and have associated permissions, whereas UDFs created within your notebook are session-based and are scoped to the current SparkSession.
You can define and access session-based UDFs using any language supported by Azure Databricks. UDFs can be scalar or non-scalar.
Note
Currently only SQL and Python scalar UDFs registered in Unity Catalog are available in DBSQL.
Scalar UDFs
Scalar UDFs operate on a single row and return a single value for each row. The following example uses a scalar UDF to calculate the length of each name in a name
column and add the value in a new column name_length
:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
To implement this in a Databricks notebook using PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
For more information, see User-defined functions (UDFs) in Unity Catalog and User-defined scalar functions - Python.
User-defined aggregate functions (UDAFs)
User-defined aggregate functions (UDAFs) operate on multiple rows and return a single aggregated result. In the following example, a UDAF is defined that aggregates scores.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
See pandas user-defined functions for Python and User-defined aggregate functions - Scala.
Python user-defined table functions (UDTFs)
Important
This feature is in Public Preview.
Note
Python UDTFs are available in Databricks Runtime 14.3 LTS and above.
Python user-defined table functions (UDTFs) can return multiple rows and columns for each input row. In the following example, each value in the score column corresponds to a list of categories. A UDTF is defined to split the comma separated list into multiple rows. See Python user-defined table functions (UDTFs)
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Performance considerations
- Built-in functions and SQL UDFs are the most efficient option available.
- Scala UDFs are generally faster as they execute within the Java Virtual Machine (JVM) and avoid the overhead of moving data in and out of the JVM.
- Python UDFs and Pandas UDFs tend to be slower than Scala UDFs because they require data to be serialized and moved out of the JVM to the Python interpreter. Pandas UDFs up to 100x faster than Python UDFs because they use Apache Arrow to reduce serialization costs.