Delta Live Tables Python language reference

This article provides details for the Delta Live Tables Python programming interface.

For information on the SQL API, see the Delta Live Tables SQL language reference.

For details specific to configuring Auto Loader, see What is Auto Loader?.

Limitations

The Delta Live Tables Python interface has the following limitations:

  • The Python table and view functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such as collect(), count(), toPandas(), save(), and saveAsTable(). However, you can include these functions outside of table or view function definitions because this code is run once during the graph initialization phase.
  • The pivot() function is not supported. The pivot operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.

Import the dlt Python module

Delta Live Tables Python functions are defined in the dlt module. Your pipelines implemented with the Python API must import this module:

import dlt

Create a Delta Live Tables materialized view or streaming table

In Python, Delta Live Tables determines whether to update a dataset as a materialized view or streaming table based on the defining query. The @table decorator is used to define both materialized views and streaming tables.

To define a materialized view in Python, apply @table to a query that performs a static read against a data source. To define a streaming table, apply @table to a query that performs a streaming read against a data source. Both dataset types have the same syntax specification as follows:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Create a Delta Live Tables view

To define a view in Python, apply the @view decorator. Like the @table decorator, you can use views in Delta Live Tables for either static or streaming datasets. The following is the syntax for defining views with Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Example: Define tables and views

To define a table or view in Python, apply the @dlt.view or @dlt.table decorator to a function. You can use the function name or the name parameter to assign the table or view name. The following example defines two different datasets: a view called taxi_raw that takes a JSON file as the input source and a table called filtered_data that takes the taxi_raw view as input:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Example: Access a dataset defined in the same pipeline

In addition to reading from external data sources, you can access datasets defined in the same pipeline with the Delta Live Tables read() function. The following example demonstrates creating a customers_filtered dataset using the read() function:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

You can also use the spark.table() function to access a dataset defined in the same pipeline. When using the spark.table() function to access a dataset defined in the pipeline, in the function argument prepend the LIVE keyword to the dataset name:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Example: Read from a table registered in a metastore

To read data from a table registered in the Hive metastore, in the function argument omit the LIVE keyword and optionally qualify the table name with the database name:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

For an example of reading from a Unity Catalog table, see Ingest data into a Unity Catalog pipeline.

Example: Access a dataset using spark.sql

You can also return a dataset using a spark.sql expression in a query function. To read from an internal dataset, prepend LIVE. to the dataset name:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Write to a streaming table from multiple source streams

Important

Delta Live Tables support for @append_flow is in Public Preview.

You can use the @append_flow decorator to write to a streaming table from multiple streaming sources to do the following:

  • Add and remove streaming sources that append data to an existing streaming table without requiring a full refresh. For example, you might have a table that combines regional data from every region you’re operating in. As new regions are rolled out, you can add the new region data to the table without performing a full refresh.
  • Update a streaming table by appending missing historical data (backfilling). For example, you have an existing streaming table that is written to by an Apache Kafka topic. You also have historical data stored in a table that you need inserted exactly once into the streaming table and you cannot stream the data because you need to perform a complex aggregation before inserting the data.

To create a target table for the records output by the @append_flow processing, use the create_streaming_table() function.

Note

If you need to define data quality constraints with expectations, define the expectations on the target table as part of the create_streaming_table() function. You cannot define expectations in the @append_flow definition.

The following is the syntax for @append_flow:

import dlt

dlt.create_streaming_table("<target-table-name>")

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment") # optional
def <function-name>():
  return (<streaming query>)

Example: Write to a streaming table from multiple Kafka topics

The following example creates a streaming table named kafka_target and writes to that streaming table from two Kafka topics:

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

Example: Run a one-time data backfill

The following example runs a query to append historical data to a streaming table:

Note

To ensure a true one-time backfill when the backfill query is part of a pipeline that runs on a scheduled basis or continuously, remove the query after running the pipeline once. To append new data if it arrives in the backfill directory, leave the query in place.

import dlt

@dlt.table()
def csv_target():
  return spark.readStream.format("csv").load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream.format("csv").load("path/to/backfill/data/dir")

Create a table to use as the target of streaming operations

Use the create_streaming_table() function to create a target table for records output by streaming operations, including apply_changes() and @append_flow output records.

Note

The create_target_table() and create_streaming_live_table() functions are deprecated. Databricks recommends updating existing code to use the create_streaming_table() function.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Arguments
name

Type: str

The table name.

This parameter is required.
comment

Type: str

An optional description for the table.
spark_conf

Type: dict

An optional list of Spark configurations for the execution of this query.
table_properties

Type: dict

An optional list of table properties for the table.
partition_cols

Type: array

An optional list of one or more columns to use for partitioning the table.
path

Type: str

An optional storage location for table data. If not set, the system will default to the pipeline storage location.
schema

Type: str or StructType

An optional schema definition for the table. Schemas can be defined as a SQL DDL string, or with a Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Type: dict

Optional data quality constraints for the table. See multiple expectations.

Control how tables are materialized

Tables also offer additional control of their materialization:

Note

For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, you should generally not specify partition columns.

Example: Specify a schema and partition columns

You can optionally specify a table schema using a Python StructType or a SQL DDL string. When specified with a DDL string, the definition can include generated columns.

The following example creates a table called sales with a schema specified using a Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

The following example specifies the schema for a table using a DDL string, defines a generated column, and defines a partition column:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

By default, Delta Live Tables infers the schema from the table definition if you don’t specify a schema.

Configure a streaming table to ignore changes in a source streaming table

Note

  • The skipChangeCommits flag works only with spark.readStream using the option() function. You cannot use this flag in a dlt.read_stream() function.
  • You cannot use the skipChangeCommits flag when the source streaming table is defined as the target of an apply_changes() function.

By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set when reading the source streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Python Delta Live Tables properties

The following tables describe the options and properties you can specify while defining tables and views with Delta Live Tables:

@table or @view
name

Type: str

An optional name for the table or view. If not defined, the function name is used as the table or view name.
comment

Type: str

An optional description for the table.
spark_conf

Type: dict

An optional list of Spark configurations for the execution of this query.
table_properties

Type: dict

An optional list of table properties for the table.
path

Type: str

An optional storage location for table data. If not set, the system will default to the pipeline storage location.
partition_cols

Type: a collection of str

An optional collection, for example, a list, of one or more columns to use for partitioning the table.
schema

Type: str or StructType

An optional schema definition for the table. Schemas can be defined as a SQL DDL string, or with a Python
StructType.
temporary

Type: bool

Create a table but do not publish metadata for the table. The temporary keyword instructs Delta Live Tables to create a table that is available to the pipeline but should not be accessed outside the pipeline. To reduce processing time, a temporary table persists for the lifetime of the pipeline that creates it, and not just a single update.

The default is ‘False’.
Table or view definition
def <function-name>()

A Python function that defines the dataset. If the name parameter is not set, then <function-name> is used as the target dataset name.
query

A Spark SQL statement that returns a Spark Dataset or Koalas DataFrame.

Use dlt.read() or spark.table() to perform a complete read from a dataset defined in the same pipeline. When using the spark.table() function to read from a dataset defined in the same pipeline, prepend the LIVE keyword to the dataset name in the function argument. For example, to read from a dataset named customers:

spark.table("LIVE.customers")

You can also use the spark.table() function to read from a table registered in the metastore by omitting the LIVE keyword and optionally qualifying the table name with the database name:

spark.table("sales.customers")

Use dlt.read_stream() to perform a streaming read from a dataset defined in the same pipeline.

Use the spark.sql function to define a SQL query to create the return dataset.

Use PySpark syntax to define Delta Live Tables queries with Python.
Expectations
@expect("description", "constraint")

Declare a data quality constraint identified by
description. If a row violates the expectation, include the row in the target dataset.
@expect_or_drop("description", "constraint")

Declare a data quality constraint identified by
description. If a row violates the expectation, drop the row from the target dataset.
@expect_or_fail("description", "constraint")

Declare a data quality constraint identified by
description. If a row violates the expectation, immediately stop execution.
@expect_all(expectations)

Declare one or more data quality constraints.
expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, include the row in the target dataset.
@expect_all_or_drop(expectations)

Declare one or more data quality constraints.
expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, drop the row from the target dataset.
@expect_all_or_fail(expectations)

Declare one or more data quality constraints.
expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, immediately stop execution.

Change data capture with Python in Delta Live Tables

Use the apply_changes() function in the Python API to use Delta Live Tables CDC functionality. The Delta Live Tables Python interface also provides the create_streaming_table() function. You can use this function to create the target table required by the apply_changes() function.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Note

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.

Important

You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the apply_changes target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field.

See Simplified change data capture with the APPLY CHANGES API in Delta Live Tables.

Arguments
target

Type: str

The name of the table to be updated. You can use the create_streaming_table() function to create the target table before executing the apply_changes() function.

This parameter is required.
source

Type: str

The data source containing CDC records.

This parameter is required.
keys

Type: list

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

You can specify either:

* A list of strings: ["userId", "orderId"]
* A list of Spark SQL col() functions: [col("userId"), col("orderId"]

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

This parameter is required.
sequence_by

Type: str or col()

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

You can specify either:

* A string: "sequenceNum"
* A Spark SQL col() function: col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

This parameter is required.
ignore_null_updates

Type: bool

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values will be overwritten with null values.

This parameter is optional.

The default is False.
apply_as_deletes

Type: str or expr()

Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the
pipelines.cdc.tombstoneGCThresholdInSeconds table property.

You can specify either:

* A string: "Operation = 'DELETE'"
* A Spark SQL expr() function: expr("Operation = 'DELETE'")

This parameter is optional.
apply_as_truncates

Type: str or expr()

Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

The apply_as_truncates parameter is supported only for SCD type 1. SCD type 2 does not support truncate.

You can specify either:

* A string: "Operation = 'TRUNCATE'"
* A Spark SQL expr() function: expr("Operation = 'TRUNCATE'")

This parameter is optional.
column_list

except_column_list

Type: list

A subset of columns to include in the target table. Use column_list to specify the complete list of columns to include. Use except_column_list to specify the columns to exclude. You can declare either value as a list of strings or as Spark SQL col() functions:

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

This parameter is optional.

The default is to include all columns in the target table when no column_list or except_column_list argument is passed to the function.
stored_as_scd_type

Type: str or int

Whether to store records as SCD type 1 or SCD type 2.

Set to 1 for SCD type 1 or 2 for SCD type 2.

This clause is optional.

The default is SCD type 1.
track_history_column_list

track_history_except_column_list

Type: list

A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use
track_history_except_column_list to specify the columns to be excluded from tracking. You can declare either value as a list of strings or as Spark SQL col() functions: - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Arguments to col() functions cannot include qualifiers. For example, you can use col(userId), but you cannot use col(source.userId).

This parameter is optional.

The default is to include all columns in the target table when no track_history_column_list or
track_history_except_column_list argument is passed to the function.