Pozyskiwanie danych z platformy Apache Kafka do usługi Azure Cosmos DB dla bazy danych Apache Cassandra przy użyciu platformy Kafka Connect
DOTYCZY: Cassandra
Istniejące aplikacje Cassandra mogą łatwo pracować z usługą Azure Cosmos DB dla bazy danych Apache Cassandra ze względu na zgodność sterowników CQLv4. Ta funkcja umożliwia integrację z platformami przesyłania strumieniowego, takimi jak Apache Kafka , i wprowadzanie danych do usługi Azure Cosmos DB.
Dane na platformie Apache Kafka (tematy) są przydatne tylko wtedy, gdy są używane przez inne aplikacje lub pozyskiwane do innych systemów. Istnieje możliwość utworzenia rozwiązania przy użyciu interfejsów API producentów/konsumentów platformy Kafkaprzy użyciu wybranego języka i zestawu SDK klienta. Platforma Kafka Connect udostępnia alternatywne rozwiązanie. Jest to platforma do strumieniowego przesyłania danych między platformą Apache Kafka a innymi systemami w sposób skalowalny i niezawodny. Ponieważ platforma Kafka Connect obsługuje gotowe łączniki, w tym rozwiązanie Cassandra, nie trzeba pisać niestandardowego kodu w celu zintegrowania platformy Kafka z usługą Azure Cosmos DB dla bazy danych Apache Cassandra.
W tym artykule użyjemy łącznika datastax typu open source platformy Apache Kafka, który działa na platformie Kafka Connect w celu pozyskiwania rekordów z tematu platformy Kafka do wierszy co najmniej jednej tabeli cassandra. W przykładzie przedstawiono konfigurację wielokrotnego użytku przy użyciu narzędzia Docker Compose. Jest to dość wygodne, ponieważ umożliwia uruchamianie wszystkich wymaganych składników lokalnie za pomocą jednego polecenia. Te składniki obejmują platformę Kafka, zookeeper, proces roboczy platformy Kafka Connect i przykładową aplikację generatora danych.
Poniżej przedstawiono podział składników i ich definicji usług — możesz odwołać się do kompletnego docker-compose
pliku w repozytorium GitHub.
- Platformy Kafka i zookeeper używają obrazów debezium .
- Aby uruchomić go jako kontener platformy Docker, łącznik Platformy Apache Kafka DataStax jest pieczony na podstawie istniejącego obrazu platformy Docker — debezium/connect-base. Ten obraz zawiera instalację platformy Kafka i bibliotek Platformy Kafka Connect, dzięki czemu jest to naprawdę wygodne w dodawaniu łączników niestandardowych. Możesz odwołać się do pliku Dockerfile.
- Nasiona
data-generator
usługi generowane losowo (JSON) dane do tematu platformyweather-data
Kafka. Możesz odwołać się do kodu iDockerfile
w repozytorium GitHub
Wymagania wstępne
Aprowizuj konto usługi Azure Cosmos DB dla bazy danych Apache Cassandra
Instalowanie platformy Docker i narzędzia Docker Compose
Tworzenie przestrzeni kluczy, tabel i uruchamianie potoku integracji
Korzystając z Azure Portal, utwórz przestrzeń kluczy Cassandra i tabele wymagane dla aplikacji demonstracyjnej.
Uwaga
Użyj tej samej przestrzeni kluczy i nazw tabel, jak poniżej
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Sklonuj repozytorium GitHub:
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
Uruchom wszystkie usługi:
docker-compose --project-name kafka-cosmos-cassandra up --build
Uwaga
Pobranie i uruchomienie kontenerów może zająć trochę czasu: jest to proces jednorazowy.
Aby sprawdzić, czy wszystkie kontenery zostały uruchomione:
docker-compose -p kafka-cosmos-cassandra ps
Aplikacja generatora danych rozpocznie pompowanie danych do tematu weather-data
na platformie Kafka. Możesz również wykonać szybką kontrolę sanity, aby potwierdzić. Zobacz kontener platformy Docker z uruchomionym procesem roboczym platformy Kafka Connect:
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
Po usunięciu do powłoki kontenera wystarczy rozpocząć zwykły proces użytkownika konsoli platformy Kafka, w którym powinny być widoczne dane pogodowe (w formacie JSON).
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Konfiguracja łącznika ujścia systemu Cassandra
Skopiuj zawartość JSON poniżej do pliku (możesz go cassandra-sink-config.json
nazwać ). Należy je zaktualizować zgodnie z konfiguracją, a pozostała część tej sekcji zawiera wskazówki dotyczące tego tematu.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
Oto podsumowanie atrybutów:
Podstawowa łączność
contactPoints
: wprowadź punkt kontaktowy dla usługi Azure Cosmos DB CassandraloadBalancing.localDc
: wprowadź region dla konta usługi Azure Cosmos DB, np. Azja Południowo-Wschodniaauth.username
: wprowadź nazwę użytkownikaauth.password
: wprowadź hasłoport
: wprowadź wartość portu (jest to10350
, a nie9042
. Pozostaw ją tak, jak jest)
Konfiguracja protokołu SSL
Usługa Azure Cosmos DB wymusza bezpieczną łączność za pośrednictwem protokołu SSL i łącznika Kafka Connect obsługuje również protokół SSL.
ssl.keystore.path
: ścieżka do magazynu kluczy JDK w kontenerze —/etc/alternatives/jre/lib/security/cacerts/
ssl.keystore.password
: hasło magazynu kluczy zestawu JDK (domyślne)ssl.hostnameValidation
: Włączamy walidację nazwy hosta węzłassl.provider
:JDK
jest używany jako dostawca SSL
Parametry ogólne
key.converter
: Używamy konwertera ciągóworg.apache.kafka.connect.storage.StringConverter
value.converter
: ponieważ dane w tematach platformy Kafka to JSON, wykorzystujemyorg.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable
: Ponieważ ładunek JSON nie ma skojarzonego z nim schematu (na potrzeby aplikacji demonstracyjnej), musimy poinstruować platformę Kafka Connect, aby nie szukała schematu, ustawiając ten atrybut nafalse
. W przeciwnym razie wystąpią błędy.
Instalowanie łącznika
Zainstaluj łącznik przy użyciu punktu końcowego REST programu Kafka Connect:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Aby sprawdzić stan:
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
Jeśli wszystko pójdzie dobrze, łącznik powinien rozpocząć tkanie jego magii. Powinna ona uwierzytelniać się w usłudze Azure Cosmos DB i rozpocząć pozyskiwanie danych z tematu platformy Kafka (weather-data
) do tabel Cassandra — weather.data_by_state
i weather.data_by_station
Teraz możesz wykonywać zapytania o dane w tabelach. Przejdź do Azure Portal, aby wyświetlić hostowaną powłokę CQL dla konta usługi Azure Cosmos DB.
Wykonywanie zapytań dotyczących danych z usługi Azure Cosmos DB
Sprawdź tabele data_by_state
i data_by_station
. Oto kilka przykładowych zapytań, które ułatwiają rozpoczęcie pracy:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
Czyszczenie zasobów
Po zakończeniu pracy z aplikacją i kontem usługi Azure Cosmos DB możesz usunąć utworzone zasoby platformy Azure, aby nie ponosić dodatkowych opłat. Aby usunąć zasoby:
Na pasku wyszukiwania Azure Portal wyszukaj i wybierz pozycję Grupy zasobów.
Z listy wybierz grupę zasobów utworzoną na potrzeby tego przewodnika Szybki start.
Na stronie Przegląd grupy zasobów wybierz pozycję Usuń grupę zasobów.
W następnym oknie wprowadź nazwę grupy zasobów do usunięcia, a następnie wybierz pozycję Usuń.