Gegevens van Apache Kafka opnemen in Azure Cosmos DB voor Apache Cassandra met behulp van Kafka Connect

VAN TOEPASSING OP: Cassandra

Bestaande Cassandra-toepassingen kunnen eenvoudig werken met Azure Cosmos DB voor Apache Cassandra vanwege de compatibiliteit van het CQLv4-stuurprogramma. U kunt deze mogelijkheid gebruiken om te integreren met streamingplatforms zoals Apache Kafka en gegevens naar Azure Cosmos DB te brengen.

Gegevens in Apache Kafka (onderwerpen) zijn alleen nuttig wanneer ze worden gebruikt door andere toepassingen of worden opgenomen in andere systemen. Het is mogelijk om een oplossing te bouwen met behulp van de Producer/Consumer-API's van Kafkamet behulp van een taal en client-SDK van uw keuze. Kafka Connect biedt een alternatieve oplossing. Het is een platform voor het op een schaalbare en betrouwbare manier streamen van gegevens tussen Apache Kafka en andere systemen. Omdat Kafka Connect standaardconnectors ondersteunt, waaronder Cassandra, hoeft u geen aangepaste code te schrijven om Kafka te integreren met Azure Cosmos DB voor Apache Cassandra.

In dit artikel gebruiken we de opensource-DataStax Apache Kafka-connector, die boven op het Kafka Connect-framework werkt om records uit een Kafka-onderwerp op te nemen in rijen van een of meer Cassandra-tabellen. Het voorbeeld biedt een herbruikbare installatie met behulp van Docker Compose. Dit is vrij handig omdat u hiermee alle vereiste onderdelen lokaal kunt bootstrappen met één opdracht. Deze onderdelen omvatten Kafka, Zookeeper, Kafka Connect-werkrol en de voorbeeldtoepassing voor gegevensgenerator.

Hier volgt een uitsplitsing van de onderdelen en de bijbehorende servicedefinities. U kunt het volledige docker-compose bestand in de GitHub-opslagplaats raadplegen.

  • Kafka en Zookeeper gebruiken debezium-installatiekopieën .
  • Als u wilt uitvoeren als een Docker-container, wordt de DataStax Apache Kafka-connector boven op een bestaande Docker-installatiekopie gebakken: debezium/connect-base. Deze installatiekopieën bevatten een installatie van Kafka en de Bijbehorende Kafka Connect-bibliotheken, waardoor het erg handig is om aangepaste connectors toe te voegen. Raadpleeg het Dockerfile.
  • De data-generator service verwerkt willekeurig gegenereerde (JSON)-gegevens in het weather-data Kafka-onderwerp. U kunt verwijzen naar de code en Dockerfile in de GitHub-opslagplaats

Vereisten

Keyspace, tabellen maken en de integratiepijplijn starten

Maak met behulp van de Azure Portal de Cassandra Keyspace en de tabellen die vereist zijn voor de demotoepassing.

Notitie

Gebruik dezelfde keyspace- en tabelnamen als hieronder

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Kloon de GitHub-opslagplaats:

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

Start alle services:

docker-compose --project-name kafka-cosmos-cassandra up --build

Notitie

Het downloaden en starten van de containers kan even duren: dit is slechts een eenmalig proces.

Controleren of alle containers zijn gestart:

docker-compose -p kafka-cosmos-cassandra ps

De gegevensgeneratortoepassing begint met het pompen van gegevens in het weather-data onderwerp in Kafka. U kunt ook een snelle saniteitscontrole uitvoeren om te bevestigen. Bekijk de Docker-container waarop de Kafka Connect-werkrol wordt uitgevoerd:

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

Zodra u zich in de containershell bevindt, start u het gebruikelijke consumentenproces van de Kafka-console en ziet u weergegevens (in JSON-indeling) binnenkomen.

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Installatie van Cassandra Sink-connector

Kopieer de onderstaande JSON-inhoud naar een bestand (u kunt dit de naam geven cassandra-sink-config.json). U moet deze bijwerken op basis van uw installatie en de rest van deze sectie bevat richtlijnen voor dit onderwerp.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000
    }
}

Hier volgt een samenvatting van de kenmerken:

Basisconnectiviteit

  • contactPoints: voer het contactpunt voor Azure Cosmos DB Cassandra in
  • loadBalancing.localDc: voer de regio voor het Azure Cosmos DB-account in, bijvoorbeeld Azië - zuidoost
  • auth.username: voer de gebruikersnaam in
  • auth.password: voer het wachtwoord in
  • port: voer de poortwaarde in (dit is 10350, niet 9042. laat deze staan zoals het is)

SSL-configuratie

Azure Cosmos DB dwingt beveiligde connectiviteit af via SSL en de Kafka Connect-connector biedt ook ondersteuning voor SSL.

  • ssl.keystore.path: pad naar het JDK-sleutelarchief in de container - /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password: wachtwoord voor JDK-sleutelarchief (standaard)
  • ssl.hostnameValidation: We maken de validatie van de hostnaam van het eigen knooppunt
  • ssl.provider: JDK wordt gebruikt als de SSL-provider

Algemene parameters

  • key.converter: We gebruiken het tekenreeksconversieprogramma org.apache.kafka.connect.storage.StringConverter
  • value.converter: omdat de gegevens in Kafka-onderwerpen JSON zijn, maken we gebruik van org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable: Omdat er geen schema aan onze JSON-nettolading is gekoppeld (voor de demo-app), moeten we Kafka Connect instrueren om niet naar een schema te zoeken door dit kenmerk in te stellen op false. Als u dit niet doet, leidt dit tot fouten.

Installeer de connector

Installeer de connector met behulp van het Kafka Connect REST-eindpunt:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

De status controleren:

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

Als alles goed gaat, moet de verbindingslijn beginnen met het weven van de magie. Het moet worden geverifieerd bij Azure Cosmos DB en beginnen met het opnemen van gegevens uit het Kafka-onderwerp (weather-data) in Cassandra-tabellen - weather.data_by_state en weather.data_by_station

U kunt nu query's uitvoeren op gegevens in de tabellen. Ga naar de Azure Portal en open de gehoste CQL Shell voor uw Azure Cosmos DB-account.

CQLSH openen

Query's uitvoeren op gegevens uit Azure Cosmos DB

Controleer de data_by_state tabellen en data_by_station . Hier volgen enkele voorbeeldquery's om u op weg te helpen:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

Resources opschonen

Wanneer u uw app en Azure Cosmos DB-account niet meer nodig hebt, kunt u de Azure-resources die u hebt gemaakt, verwijderen zodat er geen kosten meer voor in rekening worden gebracht. Om de resources te verwijderen:

  1. Zoek en selecteer Resourcegroepen in de zoekbalk op Azure Portal.

  2. Selecteer de resourcegroep die u eerder voor deze quickstart hebt gemaakt uit de lijst.

    Resourcegroep selecteren die moet worden verwijderd

  3. Selecteer Resourcegroep verwijderen op de pagina Overzicht van de resourcegroep.

    De resourcegroep verwijderen

  4. Selecteer in het volgende venster de naam van de resourcegroep die u wilt verwijderen en selecteer vervolgens Verwijderen.

Volgende stappen