Load data with Delta Live Tables

You can load data from any data source supported by Apache Spark on Azure Databricks using Delta Live Tables. You can define datasets (tables and views) in Delta Live Tables against any query that returns a Spark DataFrame, including streaming DataFrames and Pandas for Spark DataFrames. For data ingestion tasks, Databricks recommends using streaming tables for most use cases. Streaming tables are good for ingesting data from cloud object storage using Auto Loader or from message buses like Kafka. The examples below demonstrate some common patterns.

Important

Not all data sources have SQL support. You can mix SQL and Python notebooks in a Delta Live Tables pipeline to use SQL for all operations beyond ingestion.

For details on working with libraries not packaged in Delta Live Tables by default, see Pipeline dependencies.

Load files from cloud object storage

Databricks recommends using Auto Loader with Delta Live Tables for most data ingestion tasks from cloud object storage. Auto Loader and Delta Live Tables are designed to incrementally and idempotently load ever-growing data as it arrives in cloud storage. The following examples use Auto Loader to create datasets from CSV and JSON files:

Note

To load files with Auto Loader in a Unity Catalog enabled pipeline, you must use external locations. To learn more about using Unity Catalog with Delta Live Tables, see Use Unity Catalog with your Delta Live Tables pipelines.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

See What is Auto Loader? and Auto Loader SQL syntax.

Warning

If you use Auto Loader with file notifications and run a full refresh for your pipeline or streaming table, you must manually clean up your resources. You can use the CloudFilesResourceManager in a notebook to perform cleanup.

Load data from a message bus

You can configure Delta Live Tables pipelines to ingest data from message buses with streaming tables. Databricks recommends combining streaming tables with continuous execution and enhanced autoscaling to provide the most efficient ingestion for low-latency loading from message buses. See What is Enhanced Autoscaling?.

For example, the following code configures a streaming table to ingest data from Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

You can write downstream operations in pure SQL to perform streaming transformations on this data, as in the following example:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

For an example of working with Event Hubs, see Use Azure Event Hubs as a Delta Live Tables data source.

See Configure streaming data sources.

Load data from external systems

Delta Live Tables supports loading data from any data source supported by Azure Databricks. See Connect to data sources. You can also load external data using Lakehouse Federation for supported data sources. Because Lakehouse Federation requires Databricks Runtime 13.1 or above, to use Lakehouse Federation your pipeline must be configured to use the preview channel.

Some data sources do not have equivalent support in SQL. If you cannot use Lakehouse Federation with one of these data sources, you can use a standalone Python notebook to ingest data from the source. This notebook can then be added as a source library with SQL notebooks to build a Delta Live Tables pipeline. The following example declares a materialized view to access the current state of data in a remote PostgreSQL table:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Load small or static datasets from cloud object storage

You can load small or static datasets using Apache Spark load syntax. Delta Live Tables supports all of the file formats supported by Apache Spark on Azure Databricks. For a full list, see Data format options.

The following examples demonstrate loading JSON to create Delta Live Tables tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Note

The SELECT * FROM format.`path`; SQL construct is common to all SQL environments on Azure Databricks. It is the recommended pattern for direct file access using SQL with Delta Live Tables.

Securely access storage credentials with secrets in a pipeline

You can use Azure Databricks secrets to store credentials such as access keys or passwords. To configure the secret in your pipeline, use a Spark property in the pipeline settings cluster configuration. See Configure your compute settings.

The following example uses a secret to store an access key required to read input data from an Azure Data Lake Storage Gen2 (ADLS Gen2) storage account using Auto Loader. You can use this same method to configure any secret required by your pipeline, for example, AWS keys to access S3, or the password to an Apache Hive metastore.

To learn more about working with Azure Data Lake Storage Gen2, see Connect to Azure Data Lake Storage Gen2 and Blob Storage.

Note

You must add the spark.hadoop. prefix to the spark_conf configuration key that sets the secret value.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> with the ADLS Gen2 storage account name.
  • <scope-name> with the Azure Databricks secret scope name.
  • <secret-name> with the name of the key containing the Azure storage account access key.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> with the name of the Azure storage account container that stores the input data.
  • <storage-account-name> with the ADLS Gen2 storage account name.
  • <path-to-input-dataset> with the path to the input dataset.