Create an integrated CDC pipeline for MySQL

Important

This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Azure Databricks previews.

An integrated CDC pipeline ingests change data from MySQL into Azure Databricks using a single pipeline. Unlike the standard gateway-based architecture, an integrated CDC pipeline runs both extraction and application stages in one pipeline update. The standard architecture requires a separate ingestion gateway and ingestion pipeline.

When to use the integrated CDC connector

The following table compares integrated CDC pipelines with the standard gateway-based architecture:

Feature Standard CDC (gateway-based) Integrated CDC
Number of pipelines Two (ingestion gateway and ingestion pipeline) One (unified pipeline)
Setup Create a gateway, then create an ingestion pipeline that references the gateway ID Create a single pipeline that references a Unity Catalog connection
Gateway mode The gateway runs continuously as a separate long-running process Extraction is embedded in each scheduled pipeline update
Connection reference ingestion_gateway_id connection_name (a Unity Catalog connection)
Connector type Implicit default CDC behavior Explicit: connector_type: CDC
Staging volume Managed internally by the gateway Auto-created in the destination schema, or configured through data_staging_options
Pipeline mode Continuous Triggered only
Compute Classic for the gateway, serverless for the managed ingestion pipeline Classic compute only. Serverless is not supported.
Auto full refresh Not supported for existing MySQL gateway-based flows Supported
Maximum tables 250 per pipeline 250 per pipeline
SCD Type 2 Not supported Not supported
Authentication Username/password Username/password

For source database setup, see Configure MySQL for ingestion into Azure Databricks. The same source configuration applies to both architectures.

How an integrated CDC pipeline runs

Each pipeline update runs two stages in sequence:

  1. Extraction. The pipeline connects to the source database using the Unity Catalog connection. On the first run or a full refresh, it captures an initial snapshot. On subsequent runs, it captures incremental changes (inserts, updates, and deletes) using the binary log (binlog). The pipeline writes extracted data to a Unity Catalog staging volume.
  2. Application. The pipeline reads from the staging volume and applies changes to destination streaming tables in Unity Catalog. Merge operations use the configured primary keys and SCD type. The pipeline guarantees exactly-once semantics.

During the Beta period, each pipeline update has a maximum runtime of approximately 30 minutes. If the source has more changes than a single update can process, the next scheduled update resumes where the previous one stopped. To ingest data on a recurring basis, schedule the pipeline using a Lakeflow Jobs task.

Requirements

  • Your workspace is enabled for Unity Catalog.

  • If you plan to create a connection: You have CREATE CONNECTION privileges on the metastore. See Manage privileges in Unity Catalog.

    If your connector supports UI-based pipeline authoring, you can create the connection and the pipeline at the same time by completing the steps on this page. However, if you use API-based pipeline authoring, you must create the connection in Catalog Explorer before you complete the steps on this page. See Connect to managed ingestion sources.

  • If you plan to use an existing connection: You have USE CONNECTION privileges or ALL PRIVILEGES on the connection.

  • You have USE CATALOG privileges on the target catalog.

  • You have USE SCHEMA, CREATE TABLE, and CREATE VOLUME privileges on an existing schema or CREATE SCHEMA privileges on the target catalog.

  • Your workspace must have the integrated CDC connector feature enabled. Contact your Azure Databricks account team.
  • You have access to the primary MySQL instance. The integrated CDC connector does not support read replicas.
  • Enable binary logging on the source database with binlog_format=ROW and binlog_row_image=FULL.
  • You have completed the MySQL source setup. See Configure MySQL for ingestion into Azure Databricks.
  • You have the following permissions:
    • CREATE CONNECTION on the metastore (if creating a new Unity Catalog connection), or USE CONNECTION on an existing connection.
    • USE CATALOG on the destination catalog.
    • USE SCHEMA and CREATE TABLE on the destination schema.
    • CREATE VOLUME on the destination schema, or on the schema specified in data_staging_options. A staging volume is required even if data_staging_options is not set, because the pipeline autocreates one in the destination schema.

Compute requirements

Integrated CDC pipelines for MySQL require classic compute. Serverless compute is not supported.

  • Classic compute: The classic compute plane runs in your Azure Databricks workspace VPC or VNet and must reach your MySQL instance over the network. Supported network paths include VPC or VNet peering, public endpoints, and, for on-premises MySQL, AWS Direct Connect, Azure ExpressRoute, or VPN.

For classic compute, use unrestricted cluster creation permissions or a custom cluster policy with cluster_type fixed to dlt and runtime_engine fixed to STANDARD. Databricks recommends at least 8 cores for efficient extraction.

Create a Unity Catalog connection to MySQL

Create a Unity Catalog connection to MySQL before creating a pipeline. See Create a MySQL connection.

Create an integrated CDC pipeline

Create integrated CDC pipelines using the API, the Databricks CLI, notebooks, or Declarative Automation Bundles. UI creation is not yet available.

Important

All pipeline creation requests must include "channel": "PREVIEW".

Declarative Automation Bundles

Define the pipeline resource in a bundle file (for example, resources/integrated_cdc_pipeline.yml):

variables:
  pipeline_name:
    description: 'Name for the integrated CDC pipeline'
  connection_name:
    description: 'Unity Catalog connection name'
  dest_catalog:
    description: 'Destination catalog for ingested data'
  dest_schema:
    description: 'Destination schema for ingested data'

resources:
  pipelines:
    integrated_cdc_pipeline:
      name: ${var.pipeline_name}
      pipeline_type: MANAGED_INGESTION
      channel: PREVIEW
      serverless: false
      catalog: ${var.dest_catalog}
      schema: ${var.dest_schema}
      ingestion_definition:
        connection_name: ${var.connection_name}
        connector_type: CDC
        objects:
          - table:
              source_schema: 'my_database'
              source_table: 'customers'
              destination_catalog: ${var.dest_catalog}
              destination_schema: ${var.dest_schema}
              destination_table: 'customers'
              table_configuration:
                primary_keys:
                  - 'customer_id'
                scd_type: 'SCD_TYPE_1'

To run the pipeline on a schedule, define a job (for example, resources/integrated_cdc_job.yml) that triggers the pipeline. Because each extraction stage runs for at least 10 minutes, an interval of 60 minutes or longer is a good starting point:

resources:
  jobs:
    integrated_cdc_job:
      name: '${var.pipeline_name}-job'
      tasks:
        - task_key: 'cdc_ingestion'
          pipeline_task:
            pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
      schedule:
        quartz_cron_expression: '0 0 * * * ?'
        timezone_id: 'UTC'

Deploy the bundle with the Databricks CLI:

databricks bundle deploy
databricks bundle run integrated_cdc_job

For more information, see What are Declarative Automation Bundles?.

Databricks notebook

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

pipeline = w.pipelines.create(
    name="<pipeline-name>",
    pipeline_type="MANAGED_INGESTION",
    channel="PREVIEW",
    serverless=False,
    catalog="<destination-catalog>",
    schema="<destination-schema>",
    ingestion_definition={
        "connection_name": "<unity-catalog-connection-name>",
        "connector_type": "CDC",
        "objects": [
            {
                "table": {
                    "source_schema": "<source-database>",
                    "source_table": "<source-table>",
                }
            }
        ],
    },
)

print(f"Pipeline created: {pipeline.pipeline_id}")

Databricks CLI

databricks pipelines create --json '{
  "name": "<pipeline-name>",
  "pipeline_type": "MANAGED_INGESTION",
  "channel": "PREVIEW",
  "serverless": false,
  "catalog": "<destination-catalog>",
  "schema": "<destination-schema>",
  "ingestion_definition": {
    "connection_name": "<unity-catalog-connection-name>",
    "connector_type": "CDC",
    "objects": [
      {
        "table": {
          "source_schema": "<source-database>",
          "source_table": "<source-table>"
        }
      }
    ]
  }
}'

REST API

The following example replicates two tables from a MySQL database. Both inherit the top-level destination main.ingestion. "serverless": false is required for MySQL integrated CDC pipelines.

POST /api/2.0/pipelines

{
  "name": "my-integrated-cdc-pipeline",
  "pipeline_type": "MANAGED_INGESTION",
  "channel": "PREVIEW",
  "serverless": false,
  "catalog": "main",
  "schema": "ingestion",
  "ingestion_definition": {
    "connection_name": "my-mysql-connection",
    "connector_type": "CDC",
    "objects": [
      {
        "table": {
          "source_schema": "my_database",
          "source_table": "customers",
          "table_configuration": {
            "primary_keys": ["customer_id"],
            "scd_type": "SCD_TYPE_1"
          }
        }
      },
      {
        "table": {
          "source_schema": "my_database",
          "source_table": "orders",
          "table_configuration": {
            "primary_keys": ["order_id"],
            "scd_type": "SCD_TYPE_1"
          }
        }
      }
    ],
    "data_staging_options": {
      "catalog_name": "main",
      "schema_name": "ingestion_staging"
    }
  }
}

To replicate every table in a source database, use a schema object instead of individual table objects:

POST /api/2.0/pipelines

{
  "name": "my-integrated-cdc-schema-pipeline",
  "pipeline_type": "MANAGED_INGESTION",
  "channel": "PREVIEW",
  "serverless": false,
  "catalog": "main",
  "schema": "ingestion",
  "ingestion_definition": {
    "connection_name": "my-mysql-connection",
    "connector_type": "CDC",
    "objects": [
      {
        "schema": {
          "source_schema": "my_database",
          "destination_catalog": "main",
          "destination_schema": "ingestion"
        }
      }
    ]
  }
}

To start a pipeline update:

POST /api/2.0/pipelines/<pipeline-id>/updates

{
  "full_refresh": false
}

Schedule recurring updates

Integrated CDC pipelines run in triggered mode only. To ingest data on a recurring schedule, create a Lakeflow Jobs task that runs the pipeline. Each update runs for approximately 30 minutes and might not finish processing the full change backlog in a single update. Schedule pipelines at 60-minute intervals or more frequently for subsequent updates to catch up. If a trigger fires while a previous update is still running, the new update is queued.

Configuration reference

Pipeline parameters

Parameter Type Description
name string A name for the pipeline.
pipeline_type string Must be MANAGED_INGESTION.
channel string Must be PREVIEW.
serverless Boolean Must be false for MySQL integrated CDC pipelines. Serverless compute is not supported.
catalog string The default destination catalog. Used when a per-table destination_catalog is not specified.
schema string The default destination schema. Used when a per-table destination_schema is not specified.
ingestion_definition.connection_name string The Unity Catalog connection to the source database.
ingestion_definition.connector_type string Must be CDC.
ingestion_definition.objects array The list of tables or schemas to ingest.
ingestion_definition.data_staging_options object Optional. The catalog and schema where the pipeline creates the staging volume. Defaults to the pipeline's destination schema.

Table specification

Parameter Required Description
source_schema Yes The source MySQL database name.
source_table Yes The source table name.
destination_catalog No The destination catalog. Defaults to the pipeline's catalog.
destination_schema No The destination schema. Defaults to the pipeline's schema.
destination_table No The destination table name. Defaults to source_table.

Table configuration

Parameter Default Description
primary_keys Autodetected The columns that identify each row. Autodetected from the source primary key if not specified.
scd_type SCD_TYPE_1 SCD_TYPE_1 keeps the latest version only. SCD Type 2 is not supported for MySQL integrated CDC pipelines.
sequence_by Autodetected The columns used to order CDC events. Autodetected based on the source CDC mechanism if not specified.
auto_full_refresh_policy Disabled Configures automatic full refresh when unsupported DDL operations are detected. See Auto full refresh policy.

For MySQL data type mappings, see MySQL connector reference. Integrated CDC pipelines support automatic type widening: when a source column type is widened (for example, INT to BIGINT), the destination table adapts automatically.

Monitor the pipeline

After you create and start an integrated CDC pipeline, monitor its status using the following:

  • Azure Databricks UI. Open the pipeline in the Pipelines section to view update status, per-table ingestion metrics, and lineage.

  • REST API.

    GET /api/2.0/pipelines/<pipeline-id>
    
  • Events API.

    GET /api/2.0/pipelines/<pipeline-id>/events
    

The first pipeline update performs a full snapshot of all selected tables, which can take longer than incremental updates. For large tables, the initial snapshot might require multiple scheduled updates to complete. Each subsequent update picks up where the previous one left off.

To verify ingestion:

-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;

-- View recent changes (SCD Type 1 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;

For full refresh and auto full refresh behavior, see Fully refresh target tables.

Integrated CDC pipelines enable vertical autoscaling by default. If a pipeline update fails because of an out-of-memory condition, the next update automatically provisions a larger driver. To override this behavior, use a custom cluster policy.

Limitations

  • Beta. The integrated CDC connector requires workspace-level enablement. Contact your Azure Databricks account team.
  • Triggered mode only. Integrated CDC pipelines do not support continuous (always-on) execution. Schedule pipelines using a Lakeflow Jobs task.
  • API-only creation. Pipeline creation is available through the REST API, the Databricks CLI, notebooks, and Declarative Automation Bundles. UI creation is not yet supported.
  • Channel must be PREVIEW. Pipeline specs must include "channel": "PREVIEW".
  • Connection and connector type are immutable. connection_name and connector_type cannot be changed after the pipeline is created. To change the source, create a new pipeline.
  • Maximum of 250 tables per pipeline.
  • Primary instances only. The integrated CDC connector does not support read replicas. Connect to the primary MySQL instance.
  • SCD Type 2 is not supported.
  • Tables without primary keys. The pipeline treats all non-LOB columns as a composite key. Duplicate rows might collapse to a single row.
  • Initial snapshot might span multiple updates. For large tables, the initial snapshot might not finish in a single update. Subsequent scheduled updates resume where the previous update left off.
  • Each update runs for approximately 30 minutes. During Beta, the pipeline does not necessarily process the entire change backlog in a single update. Subsequent scheduled updates resume processing where the previous update left off. You cannot configure this runtime during Beta.
  • Binlog purge requires full refresh. If the MySQL binary log is purged before the pipeline processes changes, perform a full refresh on the affected tables. The pipeline detects this condition and surfaces an error in the event log.
  • Serverless compute is not supported. MySQL integrated CDC pipelines require classic compute.

Troubleshooting

Note

Some error codes use the INGESTION_GATEWAY_ prefix. This is a legacy naming convention and does not indicate that a separate ingestion gateway is required.

Error Cause Resolution
NOT_IN_DEFAULT_PUBLISHING_MODE The pipeline is not in Direct Publishing Mode. Direct Publishing Mode is set automatically for integrated CDC pipelines. If you see this error, recreate the pipeline.
INGESTION_GATEWAY_CDC_NOT_ENABLED Binary logging is not enabled or binlog_format is not set to ROW. Enable binary logging with binlog_format=ROW and binlog_row_image=FULL. See Configure MySQL for ingestion into Azure Databricks.
INGESTION_GATEWAY_MISSING_TABLE_IN_SOURCE The specified source table does not exist or has been dropped. Verify that the table exists and that the connection user has access.
INGESTION_GATEWAY_SOURCE_SCHEMA_MISSING_ENTITY The source schema does not exist. Verify the schema exists in the source database.
UNSUPPORTED_SOURCE_TYPE_FOR_CDC_CONNECTOR The source database type is not supported. The integrated CDC connector supports MySQL, SQL Server, and Oracle.
SOURCE_TABLE_REQUIRED The table specification is missing source_table. Add source_table to each table specification in the objects array.
Integrated CDC connector is disabled The workspace feature flag is not enabled. Contact your Azure Databricks account team to enable the integrated CDC connector on your workspace.

If you encounter an issue not covered here:

  1. Review the pipeline event log in the Azure Databricks UI or through GET /api/2.0/pipelines/<pipeline-id>/events.
  2. Test the Unity Catalog connection from Catalog Explorer to confirm the source is reachable.
  3. Confirm that binary logging is enabled on the source database with binlog_format=ROW and binlog_row_image=FULL.
  4. Verify that the database user has the MySQL permissions listed in Grant MySQL user privileges.
  5. Check that your pipeline spec includes "channel": "PREVIEW".

Additional resources