Kafka Connect for Azure Cosmos DB - source connector
APPLIES TO:
NoSQL
Kafka Connect for Azure Cosmos DB is a connector to read from and write data to Azure Cosmos DB. The Azure Cosmos DB source connector provides the capability to read data from the Azure Cosmos DB change feed and publish this data to a Kafka topic.
Prerequisites
- Start with the Confluent platform setup because it gives you a complete environment to work with. If you don't wish to use Confluent Platform, then you need to install and configure Zookeeper, Apache Kafka, Kafka Connect, yourself. You'll also need to install and configure the Azure Cosmos DB connectors manually.
- Create an Azure Cosmos DB account, container setup guide
- Bash shell, which is tested on GitHub Codespaces, Mac, Ubuntu, Windows with WSL2. This shell doesn’t work in Cloud Shell or WSL1.
- Download Java 11+
- Download Maven
Install the source connector
If you're using the recommended Confluent platform setup, the Azure Cosmos DB source connector is included in the installation, and you can skip this step.
Otherwise, you can use JAR file from latest Release and install the connector manually. To learn more, see these instructions. You can also package a new JAR file from the source code:
# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*dependencies.jar
Create a Kafka topic
Create a Kafka topic using Confluent Control Center. For this scenario, we'll create a Kafka topic named "apparels" and write non-schema embedded JSON data to the topic. To create a topic inside the Control Center, see create Kafka topic doc.
Create the source connector
Create the source connector in Kafka Connect
To create the Azure Cosmos DB source connector in Kafka Connect, use the following JSON config. Make sure to replace the placeholder values for connect.cosmos.connection.endpoint
, connect.cosmos.master.key
properties that you should have saved from the Azure Cosmos DB setup guide in the prerequisites.
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "apparels#kafka",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
For more information on each of the above configuration properties, see the source properties section. Once you have all the values filled out, save the JSON file somewhere locally. You can use this file to create the connector using the REST API.
Create connector using Control Center
An easy option to create the connector is from the Confluent Control Center portal. Follow the Confluent setup guide to create a connector from Control Center. When setting up, instead of using the DatagenConnector
option, use the CosmosDBSourceConnector
tile instead. When configuring the source connector, fill out the values as you've filled in the JSON file.
Alternatively, in the connectors page, you can upload the JSON file built from the previous section by using the Upload connector config file option.
Create connector using REST API
Create the source connector using the Connect REST API
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
Insert document into Azure Cosmos DB
Sign in to the Azure portal and navigate to your Azure Cosmos DB account.
Open the Data Explore tab and select Databases
Open the "kafkaconnect" database and "kafka" container you created earlier.
To create a new JSON document, in the API for NoSQL pane, expand "kafka" container, select Items, then select New Item in the toolbar.
Now, add a document to the container with the following structure. Paste the following sample JSON block into the Items tab, overwriting the current content:
{ "id": "2", "productId": "33218897", "category": "Women's Outerwear", "manufacturer": "Contoso", "description": "Black wool pea-coat", "price": "49.99", "shipping": { "weight": 2, "dimensions": { "width": 8, "height": 11, "depth": 3 } } }
Select Save.
Confirm the document has been saved by viewing the Items on the left-hand menu.
Confirm data written to Kafka topic
- Open Kafka Topic UI on
http://localhost:9000
. - Select the Kafka "apparels" topic you created.
- Verify that the document you inserted into Azure Cosmos DB earlier appears in the Kafka topic.
Cleanup
To delete the connector from the Confluent Control Center, navigate to the source connector you created and select the Delete icon.
Alternatively, use the connector’s REST API:
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector
To delete the created Azure Cosmos DB service and its resource group using Azure CLI, refer to these steps.
Source configuration properties
The following settings are used to configure the Kafka source connector. These configuration values determine which Azure Cosmos DB container is consumed, data from which Kafka topics is written, and formats to serialize the data. For an example with default values, see this configuration file.
Name | Type | Description | Required/optional |
---|---|---|---|
connector.class | String | Class name of the Azure Cosmos DB source. It should be set to com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector |
Required |
connect.cosmos.databasename | String | Name of the database to read from. | Required |
connect.cosmos.master.key | String | The Azure Cosmos DB primary key. | Required |
connect.cosmos.connection.endpoint | URI | The account endpoint. | Required |
connect.cosmos.containers.topicmap | String | Comma-separated topic to container mapping. For example, topic1#coll1, topic2#coll2 | Required |
connect.cosmos.connection.gateway.enabled | boolean | Flag to indicate whether to use gateway mode. By default it is false. | Optional |
connect.cosmos.messagekey.enabled | Boolean | This value represents if the Kafka message key should be set. Default value is true |
Required |
connect.cosmos.messagekey.field | String | Use the field's value from the document as the message key. Default is id . |
Required |
connect.cosmos.offset.useLatest | Boolean | Set to true to use the most recent source offset. Set to false to use the earliest recorded offset. Default value is false . |
Required |
connect.cosmos.task.poll.interval | Int | Interval to poll the change feed container for changes. | Required |
key.converter | String | Serialization format for the key data written into Kafka topic. | Required |
value.converter | String | Serialization format for the value data written into the Kafka topic. | Required |
key.converter.schemas.enable | String | Set to true if the key data has embedded schema. |
Optional |
value.converter.schemas.enable | String | Set to true if the key data has embedded schema. |
Optional |
tasks.max | Int | Maximum number of connectors source tasks. Default value is 1 . |
Optional |
Supported data types
The Azure Cosmos DB source connector converts JSON document to schema and supports the following JSON data types:
JSON data type | Schema type |
---|---|
Array | Array |
Boolean | Boolean |
Number | Float32 Float64 Int8 Int16 Int32 Int64 |
Null | String |
Object (JSON) | Struct |
String | String |
Next steps
- Kafka Connect for Azure Cosmos DB sink connector
Feedback
Submit and view feedback for