Manage data quality with Delta Live Tables

You use expectations to define data quality constraints on the contents of a dataset. An expectation consists of a description, an invariant, and an action to take when a record fails the invariant. You apply expectations to queries using Python decorators or SQL constraint clauses.

Use the expect, expect or drop, and expect or fail expectations with Python or SQL queries to define a single data quality constraint.

You can define expectations with one or more data quality constraints in Python pipelines using the @expect_all, @expect_all_or_drop, and @expect_all_or_fail decorators. These decorators accept a Python dictionary as an argument, where the key is the expectation name and the value is the expectation constraint.

You can view data quality metrics such as the number of records that violate an expectation by querying the Delta Live Tables event log.

Retain invalid records

Use the expect operator when you want to keep records that violate the expectation. Records that violate the expectation are added to the target dataset along with valid records:

Python

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Drop invalid records

Use the expect or drop operator to prevent the processing of invalid records. Records that violate the expectation are dropped from the target dataset:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Fail on invalid records

When invalid records are unacceptable, use the expect or fail operator to halt execution immediately when a record fails validation. If the operation is a table update, the system atomically rolls back the transaction:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

When a pipeline fails because of an expectation violation, you must fix the pipeline code to handle the invalid data correctly before re-running the pipeline.

Fail expectations modify the Spark query plan of your transformations to track information required to detect and report on violations. For many queries, you can use this information to identify which input record resulted in the violation. The following is an example exception:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Multiple expectations

Use expect_all to specify multiple data quality constraints when records that fail validation should be included in the target dataset:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_drop to specify multiple data quality constraints when records that fail validation should be dropped from the target dataset:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Use expect_all_or_fail to specify multiple data quality constraints when records that fail validation should halt pipeline execution:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

You can also define a collection of expectations as a variable and pass it to one or more queries in your pipeline:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset