Create a query-based ingestion pipeline

This page shows how to create a query-based ingestion pipeline in Lakeflow Connect.

Requirements

Before you create a query-based ingestion pipeline, you must meet the following requirements:

  • Unity Catalog is enabled for your Azure Databricks workspace.
  • Your serverless compute environment allows network connectivity to the source database. See Networking and Networking recommendations for Lakehouse Federation.
  • For foreign connection ingestion: You have an existing connection to the source database or CREATE CONNECTION privileges on the metastore. See Connect to managed ingestion sources.
  • For foreign catalog ingestion: You have an existing foreign catalog registered in Lakehouse Federation or the privileges to create one.
  • You have CREATE and USE SCHEMA privileges on the destination catalog and schema.

Option 1: Foreign connection ingestion

Use this approach when you have a connection that stores authentication credentials for the source database. Supported sources include Oracle, Teradata, SQL Server, MySQL, MariaDB, and PostgreSQL.

Databricks UI

The Azure Databricks UI deploys query-based pipelines to serverless compute. To deploy on classic compute instead, see the Declarative Automation Bundles tab.

  1. In the Azure Databricks workspace sidebar, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click your source (for example, Oracle or SQL Server). The ingestion wizard opens.

  3. On the Ingestion pipeline page, enter a name for the pipeline.

  4. For Destination catalog, select a Unity Catalog catalog to store the ingested data.

  5. Select the Unity Catalog connection that stores the credentials required to access the source database.

    If there's no existing connection, click Create connection and enter the connection details. You must have CREATE CONNECTION privileges on the metastore.

  6. Click Create pipeline and continue.

  7. On the Source page, select the schemas and tables to ingest.

  8. For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example, updated_at or row_id). If you don't select a monotonically increasing cursor column, the connector performs a full load on each run.

  9. Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).

  10. Click Next.

  11. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don't want to use an existing schema, click Create schema. You must have USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  12. Click Save and continue.

  13. (Optional) On the Settings page, click Create schedule and set the refresh frequency.

  14. (Optional) Set email notifications for pipeline success or failure.

  15. Click Save and run pipeline.

Declarative Automation Bundles

Deploy a query-based ingestion pipeline using Declarative Automation Bundles. Bundles contain YAML definitions of pipelines and jobs, are managed with the Databricks CLI, and can be deployed to multiple target workspaces. For more information, see What are Declarative Automation Bundles?.

This example deploys the pipeline to serverless compute (default). To deploy on classic compute instead, see the classic compute example.

  1. Create a bundle:

    databricks bundle init
    
  2. Add a pipeline definition file to the bundle (for example, resources/query_based_pipeline.yml):

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    resources:
      pipelines:
        pipeline_query_based:
          name: query-based-ingestion-pipeline
          ingestion_definition:
            connection_name: <your-uc-connection-name>
            objects:
              - table:
                  source_catalog: <source-catalog>
                  source_schema: <source-schema>
                  source_table: <source-table>
                  table_configuration:
                    query_based_connector_config:
                      cursor_columns:
                        - updated_at
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
    
  3. Add a job definition file that controls the ingestion schedule (for example, resources/query_based_job.yml):

    resources:
      jobs:
        query_based_job:
          name: query_based_job
    
          trigger:
            periodic:
              interval: 1
              unit: HOURS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_query_based.id}
    
  4. Deploy the bundle:

    databricks bundle deploy
    
Classic compute (Beta) example

Important

Classic compute for query-based ingestion pipelines is in Beta. Databricks recommends serverless compute for most workloads.

To deploy on classic compute, set serverless: false and add a clusters block to the pipeline definition. For the full set of supported cluster fields, see Configure classic compute for pipelines.

  1. Create a bundle:

    databricks bundle init
    
  2. Add a pipeline definition file to the bundle (for example, resources/query_based_pipeline.yml):

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    resources:
      pipelines:
        pipeline_query_based:
          name: query-based-ingestion-pipeline
          serverless: false
          clusters:
            - label: default
              node_type_id: r6i.xlarge
              driver_node_type_id: i3.large
              autoscale:
                min_workers: 1
                max_workers: 5
          ingestion_definition:
            connection_name: <your-uc-connection-name>
            objects:
              - table:
                  source_catalog: <source-catalog>
                  source_schema: <source-schema>
                  source_table: <source-table>
                  table_configuration:
                    query_based_connector_config:
                      cursor_columns:
                        - updated_at
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
    
  3. Add a job definition file that controls the ingestion schedule (for example, resources/query_based_job.yml):

    resources:
      jobs:
        query_based_job:
          name: query_based_job
    
          trigger:
            periodic:
              interval: 1
              unit: HOURS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_query_based.id}
    
  4. Deploy the bundle:

    databricks bundle deploy
    

Option 2: Foreign catalog ingestion

Use this approach when you want to ingest from a foreign catalog registered in Lakehouse Federation. Foreign catalog ingestion supports all Lakehouse Federation data sources and supports deletion tracking.

Databricks UI

The Azure Databricks UI deploys query-based pipelines to serverless compute. To deploy on classic compute instead, see the Declarative Automation Bundles tab.

  1. In the Azure Databricks workspace sidebar, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click your source. The ingestion wizard opens.

  3. On the Ingestion pipeline page, enter a name for the pipeline.

  4. For Destination catalog, select a Unity Catalog catalog to store the ingested data.

  5. For Connection type, select Foreign catalog, then pick the foreign catalog registered in Lakehouse Federation.

  6. Click Create pipeline and continue.

  7. On the Source page, select the schemas and tables to ingest.

  8. For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example, updated_at or row_id).

  9. Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).

  10. Click Next.

  11. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don't want to use an existing schema, click Create schema. You must have USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  12. Click Save and continue.

  13. (Optional) On the Settings page, click Create schedule and set the refresh frequency.

  14. (Optional) Set email notifications for pipeline success or failure.

  15. Click Save and run pipeline.

Declarative Automation Bundles

Deploy a foreign catalog ingestion pipeline using Declarative Automation Bundles. Bundles contain YAML definitions of pipelines and jobs, are managed with the Databricks CLI, and can be deployed to multiple target workspaces. For more information, see What are Declarative Automation Bundles?.

This example deploys the pipeline to serverless compute (default). To deploy on classic compute instead, see the classic compute example.

  1. Create a bundle:

    databricks bundle init
    
  2. Add a pipeline definition file to the bundle (for example, resources/foreign_catalog_pipeline.yml):

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    resources:
      pipelines:
        pipeline_foreign_catalog:
          name: foreign-catalog-ingestion-pipeline
          ingestion_definition:
            ingest_from_uc_foreign_catalog: true
            objects:
              - table:
                  source_catalog: <foreign-catalog-name>
                  source_schema: <source-schema>
                  source_table: <source-table>
                  cursor_columns:
                    - updated_at
                  primary_keys:
                    - id
                  deletion_condition: 'deleted_at IS NOT NULL'
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
    
  3. Add a job definition file (for example, resources/foreign_catalog_job.yml):

    resources:
      jobs:
        foreign_catalog_job:
          name: foreign_catalog_job
    
          trigger:
            periodic:
              interval: 1
              unit: HOURS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_foreign_catalog.id}
    
  4. Deploy the bundle:

    databricks bundle deploy
    
Classic compute (Beta) example

Important

Classic compute for query-based ingestion pipelines is in Beta. Databricks recommends serverless compute for most workloads.

To deploy on classic compute, set serverless: false and add a clusters block to the pipeline definition. For the full set of supported cluster fields, see Configure classic compute for pipelines.

  1. Create a bundle:

    databricks bundle init
    
  2. Add a pipeline definition file to the bundle (for example, resources/foreign_catalog_pipeline.yml):

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    resources:
      pipelines:
        pipeline_foreign_catalog:
          name: foreign-catalog-ingestion-pipeline
          serverless: false
          clusters:
            - label: default
              node_type_id: r6i.xlarge
              driver_node_type_id: i3.large
              autoscale:
                min_workers: 1
                max_workers: 5
          ingestion_definition:
            ingest_from_uc_foreign_catalog: true
            objects:
              - table:
                  source_catalog: <foreign-catalog-name>
                  source_schema: <source-schema>
                  source_table: <source-table>
                  cursor_columns:
                    - updated_at
                  primary_keys:
                    - id
                  deletion_condition: 'deleted_at IS NOT NULL'
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
    
  3. Add a job definition file (for example, resources/foreign_catalog_job.yml):

    resources:
      jobs:
        foreign_catalog_job:
          name: foreign_catalog_job
    
          trigger:
            periodic:
              interval: 1
              unit: HOURS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_foreign_catalog.id}
    
  4. Deploy the bundle:

    databricks bundle deploy
    

Configure incremental tracking

Query-based connectors use a cursor column to determine which rows are new or updated after the last pipeline run. Your choice of cursor column is critical for effective incremental ingestion.

Consider the following when you select a cursor column:

  • Use a timestamp column, if possible. Columns like updated_at or last_modified are ideal because they directly reflect when a row was last changed.
  • Integer IDs work for append-only sources. If rows are never updated, you can use an auto-incrementing ID column (such as id or row_id) as the cursor. Don't use an integer ID as a cursor if rows can be updated without changing the ID.
  • The column must increase monotonically. Values must never decrease. If a process such as a backfill sets the column to a past value, the connector doesn't reingest rows written before the previous high-water mark.
  • You can only specify a single cursor column. You can't specify multiple columns as a composite cursor.

After the connector stores the cursor high-water mark, it uses the high-water mark as the lower bound filter (cursor_column > last_value) on the next run. Rows with a NULL cursor value are not ingested.

Configure history tracking (SCD)

To track the full history of row changes in destination tables, configure SCD type 2. See Enable history tracking (SCD type 2).

Common patterns

For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.

Additional resources