Delta Live Tables Python language reference
This article has details for the Delta Live Tables Python programming interface.
For information on the SQL API, see the Delta Live Tables SQL language reference.
For details specific to configuring Auto Loader, see What is Auto Loader?.
Before you begin
The following are important considerations when you implement pipelines with the Delta Live Tables Python interface:
- Because the Python
table()
andview()
functions are invoked multiple times during the planning and running of a pipeline update, do not include code in one of these functions that might have side effects (for example, code that modifies data or sends an email). To avoid unexpected behavior, your Python functions that define datasets should include only the code required to define the table or view. - To perform operations such as sending emails or integrating with an external monitoring service, particularly in functions that define datasets, use event hooks. Implementing these operations in the functions that define your datasets will cause unexpected behavior.
- The Python
table
andview
functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. These operations include functions such ascollect()
,count()
,toPandas()
,save()
, andsaveAsTable()
. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects.
Import the dlt
Python module
Delta Live Tables Python functions are defined in the dlt
module. Your pipelines implemented with the Python API must import this module:
import dlt
Create a Delta Live Tables materialized view or streaming table
In Python, Delta Live Tables determines whether to update a dataset as a materialized view or streaming table based on the defining query. The @table
decorator can be used to define both materialized views and streaming tables.
To define a materialized view in Python, apply @table
to a query that performs a static read against a data source. To define a streaming table, apply @table
to a query that performs a streaming read against a data source or use the create_streaming_table() function. Both dataset types have the same syntax specification as follows:
Note
To use the cluster_by
argument to enable liquid clustering, your pipeline must be configured to use the preview channel.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Create a Delta Live Tables view
To define a view in Python, apply the @view
decorator. Like the @table
decorator, you can use views in Delta Live Tables for either static or streaming datasets. The following is the syntax for defining views with Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Example: Define tables and views
To define a table or view in Python, apply the @dlt.view
or @dlt.table
decorator to a function. You can use the function name or the name
parameter to assign the table or view name. The following example defines two different datasets: a view called taxi_raw
that takes a JSON file as the input source and a table called filtered_data
that takes the taxi_raw
view as input:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Example: Access a dataset defined in the same pipeline
In addition to reading from external data sources, you can access datasets defined in the same pipeline with the Delta Live Tables read()
function. The following example demonstrates creating a customers_filtered
dataset using the read()
function:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
You can also use the spark.table()
function to access a dataset defined in the same pipeline. When using the spark.table()
function to access a dataset defined in the pipeline, in the function argument prepend the LIVE
keyword to the dataset name:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Example: Read from a table registered in a metastore
To read data from a table registered in the Hive metastore, in the function argument, omit the LIVE
keyword and optionally qualify the table name with the database name:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
For an example of reading from a Unity Catalog table, see Ingest data into a Unity Catalog pipeline.
Example: Access a dataset using spark.sql
You can also return a dataset using a spark.sql
expression in a query function. To read from an internal dataset, prepend LIVE.
to the dataset name:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Create a table to use as the target of streaming operations
Use the create_streaming_table()
function to create a target table for records output by streaming operations, including apply_changes(), apply_changes_from_snapshot(), and @append_flow output records.
Note
The create_target_table()
and create_streaming_live_table()
functions are deprecated. Databricks recommends updating existing code to use the create_streaming_table()
function.
Note
To use the cluster_by
argument to enable liquid clustering, your pipeline must be configured to use the preview channel.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Arguments |
---|
name Type: str The table name. This parameter is required. |
comment Type: str An optional description for the table. |
spark_conf Type: dict An optional list of Spark configurations for the execution of this query. |
table_properties Type: dict An optional list of table properties for the table. |
partition_cols Type: array An optional list of one or more columns to use for partitioning the table. |
cluster_by Type: array Optionally enable liquid clustering on the table and define the columns to use as clustering keys. See Use liquid clustering for Delta tables. |
path Type: str An optional storage location for table data. If not set, the system defaults to the pipeline storage location. |
schema Type: str or StructType An optional schema definition for the table. Schemas can be defined as a SQL DDL string or with a Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Type: dict Optional data quality constraints for the table. See multiple expectations. |
row_filter (Public Preview)Type: str An optional row filter clause for the table. See Publish tables with row filters and column masks. |
Control how tables are materialized
Tables also offer additional control of their materialization:
- Specify how tables are partitioned using
partition_cols
. You can use partitioning to speed up queries. - You can set table properties when you define a view or table. See Delta Live Tables table properties.
- Set a storage location for table data using the
path
setting. By default, table data is stored in the pipeline storage location ifpath
isn’t set. - You can use generated columns in your schema definition. See Example: Specify a schema and partition columns.
Note
For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. You should not specify partition columns unless you expect your table to grow beyond a terabyte.
Example: Specify a schema and partition columns
You can optionally specify a table schema using a Python StructType
or a SQL DDL string. When specified with a DDL string, the definition can include generated columns.
The following example creates a table called sales
with a schema specified using a Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
The following example specifies the schema for a table using a DDL string, defines a generated column, and defines a partition column:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
By default, Delta Live Tables infers the schema from the table
definition if you don’t specify a schema.
Configure a streaming table to ignore changes in a source streaming table
Note
- The
skipChangeCommits
flag works only withspark.readStream
using theoption()
function. You cannot use this flag in adlt.read_stream()
function. - You cannot use the
skipChangeCommits
flag when the source streaming table is defined as the target of an apply_changes() function.
By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits
flag can be set when reading the source streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Example: Define table constraints
Important
Table constraints are in Public Preview.
When specifying a schema, you can define primary and foreign keys. The constraints are informational and are not enforced. See the CONSTRAINT clause in the SQL language reference.
The following example defines a table with a primary and foreign key constraint:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Example: Define a row filter and column mask
Important
Row filters and column masks are in Public Preview.
To create a materialized view or streaming table with a row filter and column mask, use the ROW FILTER clause and the MASK clause. The following example demonstrates how to define a materialized view and a streaming table with both a row filter and a column mask:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
For more information on row filters and column masks, see Publish tables with row filters and column masks.
Python Delta Live Tables properties
The following tables describe the options and properties you can specify while defining tables and views with Delta Live Tables:
Note
To use the cluster_by
argument to enable liquid clustering, your pipeline must be configured to use the preview channel.
@table or @view |
---|
name Type: str An optional name for the table or view. If not defined, the function name is used as the table or view name. |
comment Type: str An optional description for the table. |
spark_conf Type: dict An optional list of Spark configurations for the execution of this query. |
table_properties Type: dict An optional list of table properties for the table. |
path Type: str An optional storage location for table data. If not set, the system defaults to the pipeline storage location. |
partition_cols Type: a collection of str An optional collection, for example, a list of one or more columns to use for partitioning the table. |
cluster_by Type: array Optionally enable liquid clustering on the table and define the columns to use as clustering keys. See Use liquid clustering for Delta tables. |
schema Type: str or StructType An optional schema definition for the table. Schemas can be defined as a SQL DDL string or with a Python StructType . |
temporary Type: bool Create a table, but do not publish metadata for the table. The temporary keyword instructs Delta Live Tables to create a table that is available to the pipeline but should not be accessed outside the pipeline. To reduce processing time, a temporary table persists for the lifetime of the pipeline that creates it, and not just a single update.The default is ‘False’. |
row_filter (Public Preview)Type: str An optional row filter clause for the table. See Publish tables with row filters and column masks. |
Table or view definition |
---|
def <function-name>() A Python function that defines the dataset. If the name parameter is not set, then <function-name> is used as the target dataset name. |
query A Spark SQL statement that returns a Spark Dataset or Koalas DataFrame. Use dlt.read() or spark.table() to perform a complete read from a dataset defined in the same pipeline. When using the spark.table() function to read from a dataset defined in the same pipeline, prepend the LIVE keyword to the dataset name in the function argument. For example, to read from a dataset named customers :spark.table("LIVE.customers") You can also use the spark.table() function to read from a table registered in the metastore by omitting the LIVE keyword and optionally qualifying the table name with the database name:spark.table("sales.customers") Use dlt.read_stream() to perform a streaming read from a dataset defined in the same pipeline.Use the spark.sql function to define a SQL query to create the return dataset.Use PySpark syntax to define Delta Live Tables queries with Python. |
Expectations |
---|
@expect("description", "constraint") Declare a data quality constraint identified by description . If a row violates the expectation, include the row in the target dataset. |
@expect_or_drop("description", "constraint") Declare a data quality constraint identified by description . If a row violates the expectation, drop the row from the target dataset. |
@expect_or_fail("description", "constraint") Declare a data quality constraint identified by description . If a row violates the expectation, immediately stop execution. |
@expect_all(expectations) Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, include the row in the target dataset. |
@expect_all_or_drop(expectations) Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, drop the row from the target dataset. |
@expect_all_or_fail(expectations) Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, immediately stop execution. |
Change data capture from a change feed with Python in Delta Live Tables
Use the apply_changes()
function in the Python API to use Delta Live Tables change data capture (CDC) functionality to process source data from a change data feed (CDF).
Important
You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the apply_changes()
target table, you must include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
fields.
To create the required target table, you can use the create_streaming_table() function in the Delta Live Tables Python interface.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Note
For APPLY CHANGES
processing, the default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the APPLY AS DELETE WHEN
condition.
To learn more about CDC processing with a change feed, see The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables. For an example of using the apply_changes()
function, see Example: SCD type 1 and SCD type 2 processing with CDF source data.
Important
You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the apply_changes
target table schema, you must include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
See The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables.
Arguments |
---|
target Type: str The name of the table to be updated. You can use the create_streaming_table() function to create the target table before executing the apply_changes() function.This parameter is required. |
source Type: str The data source containing CDC records. This parameter is required. |
keys Type: list The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either: - A list of strings: ["userId", "orderId"] - A list of Spark SQL col() functions: [col("userId"), col("orderId"] Arguments to col() functions cannot include qualifiers. For example, you can use col(userId) , but you cannot use col(source.userId) .This parameter is required. |
sequence_by Type: str or col() The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. You can specify either: - A string: "sequenceNum" - A Spark SQL col() function: col("sequenceNum") Arguments to col() functions cannot include qualifiers. For example, you can use col(userId) , but you cannot use col(source.userId) .Specified column must be a sortable data type. This parameter is required. |
ignore_null_updates Type: bool Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True , columns with a null retain their existing values in the target. This also applies to nested columns with a value of null . When ignore_null_updates is False , existing values are overwritten with null values.This parameter is optional. The default is False . |
apply_as_deletes Type: str or expr() Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with thepipelines.cdc.tombstoneGCThresholdInSeconds table property.You can specify either: - A string: "Operation = 'DELETE'" - A Spark SQL expr() function: expr("Operation = 'DELETE'") This parameter is optional. |
apply_as_truncates Type: str or expr() Specifies when a CDC event should be treated as a full table TRUNCATE . Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.The apply_as_truncates parameter is supported only for SCD type 1. SCD type 2 does not support truncate operations.You can specify either: - A string: "Operation = 'TRUNCATE'" - A Spark SQL expr() function: expr("Operation = 'TRUNCATE'") This parameter is optional. |
column_list except_column_list Type: list A subset of columns to include in the target table. Use column_list to specify the complete list of columns to include. Use except_column_list to specify the columns to exclude. You can declare either value as a list of strings or as Spark SQL col() functions:- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Arguments to col() functions cannot include qualifiers. For example, you can use col(userId) , but you cannot use col(source.userId) .This parameter is optional. The default is to include all columns in the target table when no column_list or except_column_list argument is passed to the function. |
stored_as_scd_type Type: str or int Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2.This clause is optional. The default is SCD type 1. |
track_history_column_list track_history_except_column_list Type: list A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Usetrack_history_except_column_list to specify the columns to be excluded from tracking. You can declare either value as a list of strings or as Spark SQL col() functions:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Arguments to col() functions cannot include qualifiers. For example, you can use col(userId) , but you cannot use col(source.userId) .This parameter is optional. The default is to include all columns in the target table when no track_history_column_list ortrack_history_except_column_list argument is passed to the function. |
Change data capture from database snapshots with Python in Delta Live Tables
Important
The APPLY CHANGES FROM SNAPSHOT
API is in Public Preview.
Use the apply_changes_from_snapshot()
function in the Python API to use Delta Live Tables change data capture (CDC) functionality to process source data from database snapshots.
Important
You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the apply_changes_from_snapshot()
target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
To create the required target table, you can use the create_streaming_table() function in the Delta Live Tables Python interface.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Note
For APPLY CHANGES FROM SNAPSHOT
processing, the default behavior is to insert a new row when a matching record with the same key(s) does not exist in the target. If a matching record does exist, it is updated only if any of the values in the row have changed. Rows with keys present in the target but no longer present in the source are deleted.
To learn more about CDC processing with snapshots, see The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables. For examples of using the apply_changes_from_snapshot()
function, see the periodic snapshot ingestion and historical snapshot ingestion examples.
Arguments |
---|
target Type: str The name of the table to be updated. You can use the create_streaming_table() function to create the target table before running the apply_changes() function.This parameter is required. |
source Type: str or lambda function Either the name of a table or view to snapshot periodically or a Python lambda function that returns the snapshot DataFrame to be processed and the snapshot version. See Implement the source argument. This parameter is required. |
keys Type: list The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either: - A list of strings: ["userId", "orderId"] - A list of Spark SQL col() functions: [col("userId"), col("orderId"] Arguments to col() functions cannot include qualifiers. For example, you can use col(userId) , but you cannot use col(source.userId) .This parameter is required. |
stored_as_scd_type Type: str or int Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2.This clause is optional. The default is SCD type 1. |
track_history_column_list track_history_except_column_list Type: list A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Usetrack_history_except_column_list to specify the columns to be excluded from tracking. You can declare either value as a list of strings or as Spark SQL col() functions:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Arguments to col() functions cannot include qualifiers. For example, you can use col(userId) , but you cannot use col(source.userId) .This parameter is optional. The default is to include all columns in the target table when no track_history_column_list ortrack_history_except_column_list argument is passed to the function. |
Implement the source
argument
The apply_changes_from_snapshot()
function includes the source
argument. For processing historical snapshots, the source
argument is expected to be a Python lambda function that returns two values to the apply_changes_from_snapshot()
function: a Python DataFrame containing the snapshot data to be processed and a snapshot version.
The following is the signature of the lambda function:
lambda Any => Optional[(DataFrame, Any)]
- The argument to the lambda function is the most recently processed snapshot version.
- The return value of the lambda function is
None
or a tuple of two values: The first value of the tuple is a DataFrame containing the snapshot to be processed. The second value of the tuple is the snapshot version that represents the logical order of the snapshot.
An example that implements and calls the lambda function:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
The Delta Live Tables runtime performs the following steps each time the pipeline that contains the apply_changes_from_snapshot()
function is triggered:
- Runs the
next_snapshot_and_version
function to load the next snapshot DataFrame and the corresponding snapshot version. - If no DataFrame returns, the run is terminated and the pipeline update is marked as complete.
- Detects the changes in the new snapshot and incrementally applies them to the target table.
- Returns to step #1 to load the next snapshot and its version.
Limitations
The Delta Live Tables Python interface has the following limitation:
The pivot()
function is not supported. The pivot
operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.