Get started with change data capture in the analytical store for Azure Cosmos DB

APPLIES TO: NoSQL MongoDB

Use Change data capture (CDC) in Azure Cosmos DB analytical store as a source to Azure Data Factory or Azure Synapse Analytics to capture specific changes to your data.

Note

Please note that the linked service interface for Azure Cosmos DB for MongoDB API is not available on Dataflow yet. However, you would be able to use your account’s document endpoint with the “Azure Cosmos DB for NoSQL” linked service interface as a work around until the Mongo linked service is supported. On a NoSQL linked service, choose “Enter Manually” to provide the Cosmos DB account info and use account’s document endpoint (eg: https://[your-database-account-uri].documents.azure.com:443/) instead of the MongoDB endpoint (eg: mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/) 

Prerequisites

Enable analytical store

First, enable Azure Synapse Link at the account level and then enable analytical store for the containers that's appropriate for your workload.

  1. Enable Azure Synapse Link: Enable Azure Synapse Link for an Azure Cosmos DB account

  2. Enable analytical store for your containers:

    Option Guide
    Enable for a specific new container Enable Azure Synapse Link for your new containers
    Enable for a specific existing container Enable Azure Synapse Link for your existing containers

Create a target Azure resource using data flows

The change data capture feature of the analytical store is available through the data flow feature of Azure Data Factory or Azure Synapse Analytics. For this guide, use Azure Data Factory.

Important

You can alternatively use Azure Synapse Analytics. First, create an Azure Synapse workspace, if you don't already have one. Within the newly created workspace, select the Develop tab, select Add new resource, and then select Data flow.

  1. Create an Azure Data Factory, if you don't already have one.

    Tip

    If possible, create the data factory in the same region where your Azure Cosmos DB account resides.

  2. Launch the newly created data factory.

  3. In the data factory, select the Data flows tab, and then select New data flow.

  4. Give the newly created data flow a unique name. In this example, the data flow is named cosmoscdc.

    Screnshot of a new data flow with the name cosmoscdc.

Configure source settings for the analytical store container

Now create and configure a source to flow data from the Azure Cosmos DB account's analytical store.

  1. Select Add Source.

    Screenshot of the add source menu option.

  2. In the Output stream name field, enter cosmos.

    Screenshot of naming the newly created source cosmos.

  3. In the Source type section, select Inline.

    Screenshot of selecting the inline source type.

  4. In the Dataset field, select Azure - Azure Cosmos DB for NoSQL.

    Screenshot of selecting Azure Cosmos DB for NoSQL as the dataset type.

  5. Create a new linked service for your account named cosmoslinkedservice. Select your existing Azure Cosmos DB for NoSQL account in the New linked service popup dialog and then select Ok. In this example, we select a pre-existing Azure Cosmos DB for NoSQL account named msdocs-cosmos-source and a database named cosmicworks.

    Screenshot of the New linked service dialog with an Azure Cosmos DB account selected.

  6. Select Analytical for the store type.

    Screenshot of the analytical option selected for a linked service.

  7. Select the Source options tab.

  8. Within Source options, select your target container and enable Data flow debug. In this example, the container is named products.

    Screenshot of a source container selected named products.

  9. Select Data flow debug. In the Turn on data flow debug popup dialog, retain the default options and then select Ok.

    Screenshot of the toggle option to enable data flow debug.

  10. The Source options tab also contains other options you may wish to enable. This table describes those options:

Option Description
Capture intermediate updates Enable this option if you would like to capture the history of changes to items including the intermediate changes between change data capture reads.
Capture Deletes Enable this option to capture user-deleted records and apply them on the Sink. Deletes can't be applied on Azure Data Explorer and Azure Cosmos DB Sinks.
Capture Transactional store TTLs Enable this option to capture Azure Cosmos DB transactional store (time-to-live) TTL deleted records and apply on the Sink. TTL-deletes can't be applied on Azure Data Explorer and Azure Cosmos DB sinks.
Batchsize in bytes This setting is in fact gigabytes. Specify the size in gigabytes if you would like to batch the change data capture feeds
Extra Configs Extra Azure Cosmos DB analytical store configs and their values. (ex: spark.cosmos.allowWhiteSpaceInFieldNames -> true)

Working with source options

When you check any of the Capture intermediate updates, Capture Deltes, and Capture Transactional store TTLs options, your CDC process will create and populate the __usr_opType field in sink with the following values:

Value Description Option
1 UPDATE Capture Intermediate updates
2 INSERT There isn't an option for inserts, it's on by default
3 USER_DELETE Capture Deletes
4 TTL_DELETE Capture Transactional store TTLs

If you have to differentiate the TTL deleted records from documents deleted by users or applications, you have check both Capture intermediate updates and Capture Transactional store TTLs options. Then you have to adapt your CDC processes or applications or queries to use __usr_opType according to what is necessary for your business needs.

Tip

If there is a need for the downstream consumers to restore the order of updates with the “capture intermediate updates” option checked, the system timestamp _ts field can be used as the ordering field.

Create and configure sink settings for update and delete operations

First, create a straightforward Azure Blob Storage sink and then configure the sink to filter data to only specific operations.

  1. Create an Azure Blob Storage account and container, if you don't already have one. For the next examples, we'll use an account named msdocsblobstorage and a container named output.

    Tip

    If possible, create the storage account in the same region where your Azure Cosmos DB account resides.

  2. Back in Azure Data Factory, create a new sink for the change data captured from your cosmos source.

    Screenshot of adding a new sink that's connected to the existing source.

  3. Give the sink a unique name. In this example, the sink is named storage.

    Screenshot of naming the newly created sink storage.

  4. In the Sink type section, select Inline. In the Dataset field, select Delta.

    Screenshot of selecting and Inline Delta dataset type for the sink.

  5. Create a new linked service for your account using Azure Blob Storage named storagelinkedservice. Select your existing Azure Blob Storage account in the New linked service popup dialog and then select Ok. In this example, we select a pre-existing Azure Blob Storage account named msdocsblobstorage.

    Screenshot of the service type options for a new Delta linked service.

    Screenshot of the New linked service dialog with an Azure Blob Storage account selected.

  6. Select the Settings tab.

  7. Within Settings, set the Folder path to the name of the blob container. In this example, the container's name is output.

    Screenshot of the blob container named output set as the sink target.

  8. Locate the Update method section and change the selections to only allow delete and update operations. Also, specify the Key columns as a List of columns using the field {_rid} as the unique identifier.

    Screenshot of update methods and key columns being specified for the sink.

  9. Select Validate to ensure you haven't made any errors or omissions. Then, select Publish to publish the data flow.

    Screenshot of the option to validate and then publish the current data flow.

Schedule change data capture execution

After a data flow has been published, you can add a new pipeline to move and transform your data.

  1. Create a new pipeline. Give the pipeline a unique name. In this example, the pipeline is named cosmoscdcpipeline.

    Screenshot of the new pipeline option within the resources section.

  2. In the Activities section, expand the Move & transform option and then select Data flow.

    Screenshot of the data flow activity option within the activities section.

  3. Give the data flow activity a unique name. In this example, the activity is named cosmoscdcactivity.

  4. In the Settings tab, select the data flow named cosmoscdc you created earlier in this guide. Then, select a compute size based on the data volume and required latency for your workload.

    Screenshot of the configuration settings for both the data flow and compute size for the activity.

    Tip

    For incremental data sizes greater than 100 GB, we recommend the Custom size with a core count of 32 (+16 driver cores).

  5. Select Add trigger. Schedule this pipeline to execute at a cadence that makes sense for your workload. In this example, the pipeline is configured to execute every five minutes.

    Screenshot of the add trigger button for a new pipeline.

    Screenshot of a trigger configuration based on a schedule, starting in the year 2023, that runs every five minutes.

    Note

    The minimum recurrence window for change data capture executions is one minute.

  6. Select Validate to ensure you haven't made any errors or omissions. Then, select Publish to publish the pipeline.

  7. Observe the data placed into the Azure Blob Storage container as an output of the data flow using Azure Cosmos DB analytical store change data capture.

    Screnshot of the output files from the pipeline in the Azure Blob Storage container.

    Note

    The initial cluster startup time may take up to three minutes. To avoid cluster startup time in the subsequent change data capture executions, configure the Dataflow cluster Time to live value. For more information about the itegration runtime and TTL, see integration runtime in Azure Data Factory.

Concurrent jobs

The batch size in the source options, or situations when the sink is slow to ingest the stream of changes, may cause the execution of multiple jobs at the same time. To avoid this situation, set the Concurrency option to 1 in the Pipeline settings, to make sure that new executions are not triggered until the current execution completes.

Next steps