Gegevens van Apache Kafka opnemen in Azure Cosmos DB voor Apache Cassandra met behulp van Kafka Connect
VAN TOEPASSING OP: Cassandra
Bestaande Cassandra-toepassingen kunnen eenvoudig werken met Azure Cosmos DB voor Apache Cassandra vanwege de compatibiliteit van het CQLv4-stuurprogramma. U kunt deze mogelijkheid gebruiken om te integreren met streamingplatforms zoals Apache Kafka en gegevens naar Azure Cosmos DB te brengen.
Gegevens in Apache Kafka (onderwerpen) zijn alleen nuttig wanneer ze worden gebruikt door andere toepassingen of worden opgenomen in andere systemen. Het is mogelijk om een oplossing te bouwen met behulp van de Producer/Consumer-API's van Kafkamet behulp van een taal en client-SDK van uw keuze. Kafka Connect biedt een alternatieve oplossing. Het is een platform voor het op een schaalbare en betrouwbare manier streamen van gegevens tussen Apache Kafka en andere systemen. Omdat Kafka Connect standaardconnectors ondersteunt, waaronder Cassandra, hoeft u geen aangepaste code te schrijven om Kafka te integreren met Azure Cosmos DB voor Apache Cassandra.
In dit artikel gebruiken we de opensource-DataStax Apache Kafka-connector, die boven op het Kafka Connect-framework werkt om records uit een Kafka-onderwerp op te nemen in rijen van een of meer Cassandra-tabellen. Het voorbeeld biedt een herbruikbare installatie met behulp van Docker Compose. Dit is vrij handig omdat u hiermee alle vereiste onderdelen lokaal kunt bootstrappen met één opdracht. Deze onderdelen omvatten Kafka, Zookeeper, Kafka Connect-werkrol en de voorbeeldtoepassing voor gegevensgenerator.
Hier volgt een uitsplitsing van de onderdelen en de bijbehorende servicedefinities. U kunt het volledige docker-compose
bestand in de GitHub-opslagplaats raadplegen.
- Kafka en Zookeeper gebruiken debezium-installatiekopieën .
- Als u wilt uitvoeren als een Docker-container, wordt de DataStax Apache Kafka-connector boven op een bestaande Docker-installatiekopie gebakken: debezium/connect-base. Deze installatiekopieën bevatten een installatie van Kafka en de Bijbehorende Kafka Connect-bibliotheken, waardoor het erg handig is om aangepaste connectors toe te voegen. Raadpleeg het Dockerfile.
- De
data-generator
service verwerkt willekeurig gegenereerde (JSON)-gegevens in hetweather-data
Kafka-onderwerp. U kunt verwijzen naar de code enDockerfile
in de GitHub-opslagplaats
Vereisten
Docker en Docker Compose installeren
Keyspace, tabellen maken en de integratiepijplijn starten
Maak met behulp van de Azure Portal de Cassandra Keyspace en de tabellen die vereist zijn voor de demotoepassing.
Notitie
Gebruik dezelfde keyspace- en tabelnamen als hieronder
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Kloon de GitHub-opslagplaats:
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
Start alle services:
docker-compose --project-name kafka-cosmos-cassandra up --build
Notitie
Het downloaden en starten van de containers kan even duren: dit is slechts een eenmalig proces.
Controleren of alle containers zijn gestart:
docker-compose -p kafka-cosmos-cassandra ps
De gegevensgeneratortoepassing begint met het pompen van gegevens in het weather-data
onderwerp in Kafka. U kunt ook een snelle saniteitscontrole uitvoeren om te bevestigen. Bekijk de Docker-container waarop de Kafka Connect-werkrol wordt uitgevoerd:
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
Zodra u zich in de containershell bevindt, start u het gebruikelijke consumentenproces van de Kafka-console en ziet u weergegevens (in JSON-indeling) binnenkomen.
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Installatie van Cassandra Sink-connector
Kopieer de onderstaande JSON-inhoud naar een bestand (u kunt dit de naam geven cassandra-sink-config.json
). U moet deze bijwerken op basis van uw installatie en de rest van deze sectie bevat richtlijnen voor dit onderwerp.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
Hier volgt een samenvatting van de kenmerken:
Basisconnectiviteit
contactPoints
: voer het contactpunt voor Azure Cosmos DB Cassandra inloadBalancing.localDc
: voer de regio voor het Azure Cosmos DB-account in, bijvoorbeeld Azië - zuidoostauth.username
: voer de gebruikersnaam inauth.password
: voer het wachtwoord inport
: voer de poortwaarde in (dit is10350
, niet9042
. laat deze staan zoals het is)
SSL-configuratie
Azure Cosmos DB dwingt beveiligde connectiviteit af via SSL en de Kafka Connect-connector biedt ook ondersteuning voor SSL.
ssl.keystore.path
: pad naar het JDK-sleutelarchief in de container -/etc/alternatives/jre/lib/security/cacerts/
ssl.keystore.password
: wachtwoord voor JDK-sleutelarchief (standaard)ssl.hostnameValidation
: We maken de validatie van de hostnaam van het eigen knooppuntssl.provider
:JDK
wordt gebruikt als de SSL-provider
Algemene parameters
key.converter
: We gebruiken het tekenreeksconversieprogrammaorg.apache.kafka.connect.storage.StringConverter
value.converter
: omdat de gegevens in Kafka-onderwerpen JSON zijn, maken we gebruik vanorg.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable
: Omdat er geen schema aan onze JSON-nettolading is gekoppeld (voor de demo-app), moeten we Kafka Connect instrueren om niet naar een schema te zoeken door dit kenmerk in te stellen opfalse
. Als u dit niet doet, leidt dit tot fouten.
Installeer de connector
Installeer de connector met behulp van het Kafka Connect REST-eindpunt:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
De status controleren:
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
Als alles goed gaat, moet de verbindingslijn beginnen met het weven van de magie. Het moet worden geverifieerd bij Azure Cosmos DB en beginnen met het opnemen van gegevens uit het Kafka-onderwerp (weather-data
) in Cassandra-tabellen - weather.data_by_state
en weather.data_by_station
U kunt nu query's uitvoeren op gegevens in de tabellen. Ga naar de Azure Portal en open de gehoste CQL Shell voor uw Azure Cosmos DB-account.
Query's uitvoeren op gegevens uit Azure Cosmos DB
Controleer de data_by_state
tabellen en data_by_station
. Hier volgen enkele voorbeeldquery's om u op weg te helpen:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
Resources opschonen
Wanneer u uw app en Azure Cosmos DB-account niet meer nodig hebt, kunt u de Azure-resources die u hebt gemaakt, verwijderen zodat er geen kosten meer voor in rekening worden gebracht. Om de resources te verwijderen:
Zoek en selecteer Resourcegroepen in de zoekbalk op Azure Portal.
Selecteer de resourcegroep die u eerder voor deze quickstart hebt gemaakt uit de lijst.
Selecteer Resourcegroep verwijderen op de pagina Overzicht van de resourcegroep.
Selecteer in het volgende venster de naam van de resourcegroep die u wilt verwijderen en selecteer vervolgens Verwijderen.