Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
APPLIES TO:
Cassandra
Existing Cassandra applications can easily work with the Azure Cosmos DB for Apache Cassandra because of its CQLv4 driver compatibility. You use this capability to integrate with streaming platforms such as Apache Kafka and bring data into Azure Cosmos DB.
Data in Apache Kafka (topics) is only useful when consumed by other applications or ingested into other systems. It's possible to build a solution using the Kafka Producer/Consumer APIs using a language and client SDK of your choice. Kafka Connect provides an alternative solution. It's a platform to stream data between Apache Kafka and other systems in a scalable and reliable manner. Since Kafka Connect supports off the shelf connectors, which include Cassandra, you don't need to write custom code to integrate Kafka with Azure Cosmos DB for Apache Cassandra.
This article uses the open-source DataStax Apache Kafka connector that works on top of Kafka Connect framework to ingest records from a Kafka topic into rows of Cassandra tables. The example provides a reusable setup using Docker Compose. This example enables you to bootstrap all the required components locally with a single command. These components include Kafka, Zookeeper, Kafka Connect worker, and the sample data generator application.
Here's a breakdown of the components and their service definitions. Refer to the complete docker-compose
file in the GitHub repo.
- Kafka and Zookeeper use debezium images.
- To run as a Docker container, the DataStax Apache Kafka Connector is included on top of an existing Docker image: debezium/connect-base. This image includes an installation of Kafka and its Kafka Connect libraries, which makes it convenient to add custom connectors. Refer to the Dockerfile.
- The
data-generator
service seeds randomly generated (JSON) data into theweather-data
Kafka topic. Refer to the code andDockerfile
in the GitHub repo.
Prerequisites
- Provision an Azure Cosmos DB for Apache Cassandra account
- Use cqlsh for validation
- Install Docker and Docker Compose
Create Keyspace, tables and start the integration pipeline
Using the Azure portal, create the Cassandra Keyspace and the tables required for the demo application.
Note
Use the same Keyspace and table names as used here.
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Clone the GitHub repo:
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
Start all the services:
docker-compose --project-name kafka-cosmos-cassandra up --build
Note
It might take a while to download and start the containers. This setup is just a one time process.
To confirm whether all the containers started:
docker-compose -p kafka-cosmos-cassandra ps
The data generator application starts pumping data into the weather-data
topic in Kafka. You can also do quick check to confirm. Peek into the Docker container running the Kafka connect worker:
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
After you drop into the container shell, start the usual Kafka console consumer process. You should see weather data in JSON format flowing in.
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Cassandra Sink connector setup
Copy the JSON contents here to a file. Name it cassandra-sink-config.json
. You need to update it per your setup. The rest of this section provides guidance.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
Here's a summary of the attributes:
Basic connectivity
contactPoints
: Enter the contact point for Azure Cosmos DB CassandraloadBalancing.localDc
: Enter the region for Azure Cosmos DB account, such as Southeast Asiaauth.username
: Enter the usernameauth.password
: Enter the passwordport
: Enter the port value. This value is10350
, not9042
. leave it as is
SSL configuration
Azure Cosmos DB enforces secure connectivity over SSL and Kafka Connect connector supports SSL as well.
ssl.keystore.path
: Path to the JDK keystore in the container -/etc/alternatives/jre/lib/security/cacerts/
ssl.keystore.password
: JDK keystore (default) passwordssl.hostnameValidation
: We turn own node hostname validationssl.provider
:JDK
is used as the SSL provider
Generic parameters
key.converter
: We use the string converterorg.apache.kafka.connect.storage.StringConverter
value.converter
: Since the data in Kafka topics is JSON, we useorg.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable
: Since our JSON payload doesn't have a schema associated with it for the purposes of the demo app, we need to instruct Kafka Connect to not look for a schema by setting this attribute tofalse
. Not doing so results in failures.
Install the connector
Install the connector using the Kafka Connect REST endpoint:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
To check the status:
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
If all goes well, the connector should start weaving its magic. It should authenticate to Azure Cosmos DB and start ingesting data from the Kafka topic (weather-data
) into Cassandra tables: weather.data_by_state
and weather.data_by_station
.
You can now query data in the tables. In the Azure portal, bring up the hosted CQL Shell for your Azure Cosmos DB account.
Query data from Azure Cosmos DB
Check the data_by_state
and data_by_station
tables. Here's some sample queries to get you started:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
Clean up resources
When you're done with your app and Azure Cosmos DB account, you can delete the Azure resources you created so you don't incur more charges. To delete the resources:
In the Azure portal Search bar, search for and select Resource groups.
From the list, select the resource group you created for this quickstart.
On the resource group Overview page, select Delete resource group.
In the next window, enter the name of the resource group to delete, and then select Delete.