Ingest data from Apache Kafka into Azure Cosmos DB for Apache Cassandra using Kafka Connect
APPLIES TO: Cassandra
Existing Cassandra applications can easily work with the Azure Cosmos DB for Apache Cassandra because of its CQLv4 driver compatibility. You use this capability to integrate with streaming platforms such as Apache Kafka and bring data into Azure Cosmos DB.
Data in Apache Kafka (topics) is only useful when consumed by other applications or ingested into other systems. It's possible to build a solution using the Kafka Producer/Consumer APIs using a language and client SDK of your choice. Kafka Connect provides an alternative solution. It's a platform to stream data between Apache Kafka and other systems in a scalable and reliable manner. Since Kafka Connect supports off the shelf connectors which include Cassandra, you don't need to write custom code to integrate Kafka with Azure Cosmos DB for Apache Cassandra.
In this article, we are using the open-source DataStax Apache Kafka connector, that works on top of Kafka Connect framework to ingest records from a Kafka topic into rows of one or more Cassandra tables. The example provides a reusable setup using Docker Compose. This is convenient since it enables you to bootstrap all the required components locally with a single command. These components include Kafka, Zookeeper, Kafka Connect worker, and the sample data generator application.
Here's a breakdown of the components and their service definitions - you can refer to the complete docker-compose
file in the GitHub repo.
- Kafka and Zookeeper use debezium images.
- To run as a Docker container, the DataStax Apache Kafka Connector is baked on top of an existing Docker image - debezium/connect-base. This image includes an installation of Kafka and its Kafka Connect libraries, thus making it convenient to add custom connectors. You can refer to the Dockerfile.
- The
data-generator
service seeds randomly generated (JSON) data into theweather-data
Kafka topic. You can refer to the code andDockerfile
in the GitHub repo
Prerequisites
Install Docker and Docker Compose
Create Keyspace, tables and start the integration pipeline
Using the Azure portal, create the Cassandra Keyspace and the tables required for the demo application.
Note
Use the same Keyspace and table names as below
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Clone the GitHub repo:
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
Start all the services:
docker-compose --project-name kafka-cosmos-cassandra up --build
Note
It might take a while to download and start the containers: this is just a one time process.
To confirm whether all the containers have started:
docker-compose -p kafka-cosmos-cassandra ps
The data generator application starts pumping data into the weather-data
topic in Kafka. You can also do quick check to confirm. Peek into the Docker container running the Kafka connect worker:
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
Once you drop into the container shell, just start the usual Kafka console consumer process and you should see weather data (in JSON format) flowing in.
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Cassandra Sink connector setup
Copy the JSON contents below to a file (you can name it cassandra-sink-config.json
). You need to update it as per your setup and the rest of this section will provide guidance around this topic.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
Here's a summary of the attributes:
Basic connectivity
contactPoints
: enter the contact point for Azure Cosmos DB CassandraloadBalancing.localDc
: enter the region for Azure Cosmos DB account e.g. Southeast Asiaauth.username
: enter the usernameauth.password
: enter the passwordport
: enter the port value (this is10350
, not9042
. leave it as is)
SSL configuration
Azure Cosmos DB enforces secure connectivity over SSL and Kafka Connect connector supports SSL as well.
ssl.keystore.path
: path to the JDK keystore in the container -/etc/alternatives/jre/lib/security/cacerts/
ssl.keystore.password
: JDK keystore (default) passwordssl.hostnameValidation
: We turn own node hostname validationssl.provider
:JDK
is used as the SSL provider
Generic parameters
key.converter
: We use the string converterorg.apache.kafka.connect.storage.StringConverter
value.converter
: since the data in Kafka topics is JSON, we make use oforg.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable
: Since our JSON payload doesn't have a schema associated with it (for the purposes of the demo app), we need to instruct Kafka Connect to not look for a schema by setting this attribute tofalse
. Not doing so results in failures.
Install the connector
Install the connector using the Kafka Connect REST endpoint:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
To check the status:
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
If all goes well, the connector should start weaving its magic. It should authenticate to Azure Cosmos DB and start ingesting data from the Kafka topic (weather-data
) into Cassandra tables - weather.data_by_state
and weather.data_by_station
You can now query data in the tables. Head over to the Azure portal, bring up the hosted CQL Shell for your Azure Cosmos DB account.
Query data from Azure Cosmos DB
Check the data_by_state
and data_by_station
tables. Here's some sample queries to get you started:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
Clean up resources
When you're done with your app and Azure Cosmos DB account, you can delete the Azure resources you created so you don't incur more charges. To delete the resources:
In the Azure portal Search bar, search for and select Resource groups.
From the list, select the resource group you created for this quickstart.
On the resource group Overview page, select Delete resource group.
In the next window, enter the name of the resource group to delete, and then select Delete.