Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
GILT FÜR: Kassandra
Vorhandene Cassandra-Anwendungen können aufgrund der Kompatibilität der CQLv4-Treiber problemlos mit Azure Cosmos DB for Apache Cassandra zusammenarbeiten. Sie nutzen diese Fähigkeit, um Streamingplattformen wie Apache Kafka zu integrieren und Daten in Azure Cosmos DB einzubinden.
Daten in Apache Kafka (Themen) sind nur nützlich, wenn Sie von anderen Anwendungen verwendet oder in anderen Systemen erfasst werden. Sie können eine Lösung über die Producer- und Consumer-APIs von Kafka mit einer Sprache und einem Client-SDK Ihrer Wahl erstellen. Kafka Connect bietet eine alternative Lösung. Es ist eine Plattform zum Streamen von Daten zwischen Apache Kafka und anderen Systemen auf skalierbare und zuverlässige Weise. Da Kafka Connect serienmäßige Konnektoren unterstützt, darunter Cassandra, müssen Sie keinen benutzerdefinierten Code schreiben, um Kafka mit Azure Cosmos DB für Apache Cassandra zu integrieren.
In diesem Artikel wird der Open-Source DataStax Apache Kafka-Connector verwendet, der über kafka Connect Framework funktioniert, um Datensätze aus einem Kafka-Thema in Zeilen von Cassandra-Tabellen aufzunehmen. Das Beispiel enthält eine wiederverwendbare Einrichtung mithilfe von Docker Compose. In diesem Beispiel können Sie alle erforderlichen Komponenten lokal mit einem einzigen Befehl bootstrapieren. Zu diese Komponenten gehören Kafka, Zookeeper, Kafka Connect-Worker und die Datengenerator-Beispielanwendung.
Hier ist eine Aufschlüsselung der Komponenten und deren Dienstdefinitionen. Verweisen Sie auf die vollständige docker-compose
Datei im GitHub-Repository.
- Kafka und Zookeeper verwenden Debezium-Images.
- Um als Docker-Container auszuführen, ist der DataStax Apache Kafka Connector über einem vorhandenen Docker-Image enthalten: debezium/connect-base. Dieses Image enthält eine Installation von Kafka und seinen Kafka Connect-Bibliotheken, wodurch es praktisch ist, benutzerdefinierte Connectors hinzuzufügen. Verweisen Sie auf die Dockerfile-Datei.
- Der
data-generator
-Dienst fügt zufällig generierte (JSON-) Daten in das Kafka-Themaweather-data
ein. Verweise auf den Code undDockerfile
im GitHub-Repo.
Voraussetzungen
- Stellen Sie ein Azure Cosmos DB for Apache Cassandra-Konto bereit.
- Verwenden Sie cqlsh für die Überprüfung
- Installieren von Docker und Docker Compose
Erstellen von Keyspace und Tabellen und Starten der Integrationspipeline
Erstellen Sie mithilfe des Azure-Portals den Cassandra-Keyspace und die Tabellen, die für die Demoanwendung erforderlich sind.
Hinweis
Verwenden Sie die gleichen Keyspace- und Tabellennamen wie hier.
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Klonen Sie das GitHub-Repository:
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
Starten Sie alle Dienste:
docker-compose --project-name kafka-cosmos-cassandra up --build
Hinweis
Es kann eine Weile dauern, bis die Container heruntergeladen und gestartet werden. Dieses Setup ist nur ein einmaliger Prozess.
So bestätigen Sie, ob alle Container gestartet wurden:
docker-compose -p kafka-cosmos-cassandra ps
Die Datengeneratoranwendung beginnt, das Thema weather-data
in Kafka mit Daten zu füllen. Sie können auch eine schnelle Prüfung durchführen, um dies zu bestätigen. Sehen Sie sich den Docker-Container an, der den Kafka Connect-Worker ausführt:
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
Starten Sie nach dem Ablegen in die Containershell den üblichen Kafka-Konsolen-Heimanwenderprozess. Wetterdaten sollten im JSON-Format angezeigt werden.
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Einrichtung des Cassandra Sink-Connectors
Kopieren Sie den JSON-Inhalt hier in eine Datei. Nennen Sie es cassandra-sink-config.json
. Sie müssen sie nach Ihrem Setup aktualisieren. Der Rest dieses Abschnitts enthält Anleitungen.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
Im Folgenden finden Sie eine Übersicht über die Attribute:
Grundlegende Konnektivität
contactPoints
: Geben Sie den Kontaktpunkt für Azure Cosmos DB Cassandra ein.loadBalancing.localDc
: Geben Sie die Region für das Azure Cosmos DB-Konto ein, z. B. Südostasienauth.username
: Geben Sie den Benutzernamen ein.auth.password
: Geben Sie das Kennwort ein.port
: Geben Sie den Portwert ein. Dieser Wert ist10350
, nicht9042
. lassen, wie es ist
SSL-Konfiguration
Azure Cosmos DB erzwingt sichere Konnektivität über SSL und der Kafka Connect-Connector unterstützt ebenfalls SSL.
ssl.keystore.path
: Pfad zum JDK-Schlüsselspeicher im Container -/etc/alternatives/jre/lib/security/cacerts/
ssl.keystore.password
: Kennwort (Standard) für den JDK-Keystoressl.hostnameValidation
: Die Überprüfung des Knotenhostnamens wird aktiviert.ssl.provider
:JDK
wird als SSL-Anbieter verwendet.
Generische Parameter
key.converter
: Der Zeichenfolgenkonverterorg.apache.kafka.connect.storage.StringConverter
wird verwendet.value.converter
: Da die Daten in Kafka-Themen JSON sind, verwenden wirorg.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable
: Da unsere JSON-Nutzlast für die Zwecke der Demo-App kein Schema zugeordnet ist, müssen wir Kafka Connect anweisen, nicht nach einem Schema zu suchen, indem sie dieses Attribut auffalse
festlegen. Andernfalls treten Fehler auf.
Installieren des Connectors
Installieren Sie den Connector mithilfe des Kafka Connect-REST-Endpunkts:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Überprüfen Sie den Status wie folgt:
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
Wenn alles gut läuft, sollte der Connector mit seinem Wunderwerk beginnen. Es sollte sich bei Azure Cosmos DB authentifizieren und mit dem Aufnehmen von Daten aus dem Kafka-Thema (weather-data
) in Cassandra-Tabellen beginnen: weather.data_by_state
und weather.data_by_station
.
Sie können nun Daten in den Tabellen abfragen. Rufen Sie im Azure-Portal die gehostete CQL-Shell für Ihr Azure Cosmos DB-Konto auf.
Abfragen von Daten aus Azure Cosmos DB
Überprüfen Sie die Tabellen data_by_state
und data_by_station
. Es folgen einige Beispielabfragen für den Einstieg:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
Bereinigen von Ressourcen
Wenn Sie Ihre App und das Azure Cosmos DB-Konto fertiggestellt haben, können Sie die erstellten Azure-Ressourcen löschen, damit keine weiteren Gebühren anfallen. So löschen Sie die Ressourcen:
Suchen Sie über die Suchleiste des Azure-Portals nach Ressourcengruppen, und wählen Sie die entsprechende Option aus.
Wählen Sie in der Liste die Ressourcengruppe aus, die Sie für diesen Schnellstart erstellt haben.
Wählen Sie auf der Seite Übersicht der Ressourcengruppe die Option Ressourcengruppe löschen aus.
Geben Sie in dem nächsten Fenster den Namen der zu löschenden Ressourcengruppe ein, und wählen Sie dann Löschen aus.