Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This page shows how to create a query-based ingestion pipeline in Lakeflow Connect.
Requirements
Before you create a query-based ingestion pipeline, you must meet the following requirements:
- Unity Catalog is enabled for your Azure Databricks workspace.
- Your serverless compute environment allows network connectivity to the source database. See Networking and Networking recommendations for Lakehouse Federation.
- For foreign connection ingestion: You have an existing connection to the source database or
CREATE CONNECTIONprivileges on the metastore. See Connect to managed ingestion sources. - For foreign catalog ingestion: You have an existing foreign catalog registered in Lakehouse Federation or the privileges to create one.
- You have
CREATEandUSE SCHEMAprivileges on the destination catalog and schema.
Option 1: Foreign connection ingestion
Use this approach when you have a connection that stores authentication credentials for the source database. Supported sources include Oracle, Teradata, SQL Server, MySQL, MariaDB, and PostgreSQL.
Databricks UI
The Azure Databricks UI deploys query-based pipelines to serverless compute. To deploy on classic compute instead, see the Declarative Automation Bundles tab.
In the Azure Databricks workspace sidebar, click Data Ingestion.
On the Add data page, under Databricks connectors, click your source (for example, Oracle or SQL Server). The ingestion wizard opens.
On the Ingestion pipeline page, enter a name for the pipeline.
For Destination catalog, select a Unity Catalog catalog to store the ingested data.
Select the Unity Catalog connection that stores the credentials required to access the source database.
If there's no existing connection, click Create connection and enter the connection details. You must have
CREATE CONNECTIONprivileges on the metastore.Click Create pipeline and continue.
On the Source page, select the schemas and tables to ingest.
For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example,
updated_atorrow_id). If you don't select a monotonically increasing cursor column, the connector performs a full load on each run.Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).
Click Next.
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don't want to use an existing schema, click Create schema. You must have
USE CATALOGandCREATE SCHEMAprivileges on the parent catalog.Click Save and continue.
(Optional) On the Settings page, click Create schedule and set the refresh frequency.
(Optional) Set email notifications for pipeline success or failure.
Click Save and run pipeline.
Declarative Automation Bundles
Deploy a query-based ingestion pipeline using Declarative Automation Bundles. Bundles contain YAML definitions of pipelines and jobs, are managed with the Databricks CLI, and can be deployed to multiple target workspaces. For more information, see What are Declarative Automation Bundles?.
This example deploys the pipeline to serverless compute (default). To deploy on classic compute instead, see the classic compute example.
Create a bundle:
databricks bundle initAdd a pipeline definition file to the bundle (for example,
resources/query_based_pipeline.yml):variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema resources: pipelines: pipeline_query_based: name: query-based-ingestion-pipeline ingestion_definition: connection_name: <your-uc-connection-name> objects: - table: source_catalog: <source-catalog> source_schema: <source-schema> source_table: <source-table> table_configuration: query_based_connector_config: cursor_columns: - updated_at destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} target: ${var.dest_schema} catalog: ${var.dest_catalog}Add a job definition file that controls the ingestion schedule (for example,
resources/query_based_job.yml):resources: jobs: query_based_job: name: query_based_job trigger: periodic: interval: 1 unit: HOURS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_query_based.id}Deploy the bundle:
databricks bundle deploy
Classic compute (Beta) example
Important
Classic compute for query-based ingestion pipelines is in Beta. Databricks recommends serverless compute for most workloads.
To deploy on classic compute, set serverless: false and add a clusters block to the pipeline definition. For the full set of supported cluster fields, see Configure classic compute for pipelines.
Create a bundle:
databricks bundle initAdd a pipeline definition file to the bundle (for example,
resources/query_based_pipeline.yml):variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema resources: pipelines: pipeline_query_based: name: query-based-ingestion-pipeline serverless: false clusters: - label: default node_type_id: r6i.xlarge driver_node_type_id: i3.large autoscale: min_workers: 1 max_workers: 5 ingestion_definition: connection_name: <your-uc-connection-name> objects: - table: source_catalog: <source-catalog> source_schema: <source-schema> source_table: <source-table> table_configuration: query_based_connector_config: cursor_columns: - updated_at destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} target: ${var.dest_schema} catalog: ${var.dest_catalog}Add a job definition file that controls the ingestion schedule (for example,
resources/query_based_job.yml):resources: jobs: query_based_job: name: query_based_job trigger: periodic: interval: 1 unit: HOURS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_query_based.id}Deploy the bundle:
databricks bundle deploy
Option 2: Foreign catalog ingestion
Use this approach when you want to ingest from a foreign catalog registered in Lakehouse Federation. Foreign catalog ingestion supports all Lakehouse Federation data sources and supports deletion tracking.
Databricks UI
The Azure Databricks UI deploys query-based pipelines to serverless compute. To deploy on classic compute instead, see the Declarative Automation Bundles tab.
In the Azure Databricks workspace sidebar, click Data Ingestion.
On the Add data page, under Databricks connectors, click your source. The ingestion wizard opens.
On the Ingestion pipeline page, enter a name for the pipeline.
For Destination catalog, select a Unity Catalog catalog to store the ingested data.
For Connection type, select Foreign catalog, then pick the foreign catalog registered in Lakehouse Federation.
Click Create pipeline and continue.
On the Source page, select the schemas and tables to ingest.
For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example,
updated_atorrow_id).Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).
Click Next.
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don't want to use an existing schema, click Create schema. You must have
USE CATALOGandCREATE SCHEMAprivileges on the parent catalog.Click Save and continue.
(Optional) On the Settings page, click Create schedule and set the refresh frequency.
(Optional) Set email notifications for pipeline success or failure.
Click Save and run pipeline.
Declarative Automation Bundles
Deploy a foreign catalog ingestion pipeline using Declarative Automation Bundles. Bundles contain YAML definitions of pipelines and jobs, are managed with the Databricks CLI, and can be deployed to multiple target workspaces. For more information, see What are Declarative Automation Bundles?.
This example deploys the pipeline to serverless compute (default). To deploy on classic compute instead, see the classic compute example.
Create a bundle:
databricks bundle initAdd a pipeline definition file to the bundle (for example,
resources/foreign_catalog_pipeline.yml):variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema resources: pipelines: pipeline_foreign_catalog: name: foreign-catalog-ingestion-pipeline ingestion_definition: ingest_from_uc_foreign_catalog: true objects: - table: source_catalog: <foreign-catalog-name> source_schema: <source-schema> source_table: <source-table> cursor_columns: - updated_at primary_keys: - id deletion_condition: 'deleted_at IS NOT NULL' destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} target: ${var.dest_schema} catalog: ${var.dest_catalog}Add a job definition file (for example,
resources/foreign_catalog_job.yml):resources: jobs: foreign_catalog_job: name: foreign_catalog_job trigger: periodic: interval: 1 unit: HOURS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_foreign_catalog.id}Deploy the bundle:
databricks bundle deploy
Classic compute (Beta) example
Important
Classic compute for query-based ingestion pipelines is in Beta. Databricks recommends serverless compute for most workloads.
To deploy on classic compute, set serverless: false and add a clusters block to the pipeline definition. For the full set of supported cluster fields, see Configure classic compute for pipelines.
Create a bundle:
databricks bundle initAdd a pipeline definition file to the bundle (for example,
resources/foreign_catalog_pipeline.yml):variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema resources: pipelines: pipeline_foreign_catalog: name: foreign-catalog-ingestion-pipeline serverless: false clusters: - label: default node_type_id: r6i.xlarge driver_node_type_id: i3.large autoscale: min_workers: 1 max_workers: 5 ingestion_definition: ingest_from_uc_foreign_catalog: true objects: - table: source_catalog: <foreign-catalog-name> source_schema: <source-schema> source_table: <source-table> cursor_columns: - updated_at primary_keys: - id deletion_condition: 'deleted_at IS NOT NULL' destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} target: ${var.dest_schema} catalog: ${var.dest_catalog}Add a job definition file (for example,
resources/foreign_catalog_job.yml):resources: jobs: foreign_catalog_job: name: foreign_catalog_job trigger: periodic: interval: 1 unit: HOURS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_foreign_catalog.id}Deploy the bundle:
databricks bundle deploy
Configure incremental tracking
Query-based connectors use a cursor column to determine which rows are new or updated after the last pipeline run. Your choice of cursor column is critical for effective incremental ingestion.
Consider the following when you select a cursor column:
- Use a timestamp column, if possible. Columns like
updated_atorlast_modifiedare ideal because they directly reflect when a row was last changed. - Integer IDs work for append-only sources. If rows are never updated, you can use an auto-incrementing ID column (such as
idorrow_id) as the cursor. Don't use an integer ID as a cursor if rows can be updated without changing the ID. - The column must increase monotonically. Values must never decrease. If a process such as a backfill sets the column to a past value, the connector doesn't reingest rows written before the previous high-water mark.
- You can only specify a single cursor column. You can't specify multiple columns as a composite cursor.
After the connector stores the cursor high-water mark, it uses the high-water mark as the lower bound filter (cursor_column > last_value) on the next run. Rows with a NULL cursor value are not ingested.
Configure history tracking (SCD)
To track the full history of row changes in destination tables, configure SCD type 2. See Enable history tracking (SCD type 2).
Common patterns
For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.