Pozyskiwanie danych z platformy Apache Kafka do usługi Azure Cosmos DB dla bazy danych Apache Cassandra przy użyciu platformy Kafka Connect

DOTYCZY: Cassandra

Istniejące aplikacje Cassandra mogą łatwo pracować z usługą Azure Cosmos DB dla bazy danych Apache Cassandra ze względu na zgodność sterowników CQLv4. Ta funkcja umożliwia integrację z platformami przesyłania strumieniowego, takimi jak Apache Kafka , i wprowadzanie danych do usługi Azure Cosmos DB.

Dane na platformie Apache Kafka (tematy) są przydatne tylko wtedy, gdy są używane przez inne aplikacje lub pozyskiwane do innych systemów. Istnieje możliwość utworzenia rozwiązania przy użyciu interfejsów API producentów/konsumentów platformy Kafkaprzy użyciu wybranego języka i zestawu SDK klienta. Platforma Kafka Connect udostępnia alternatywne rozwiązanie. Jest to platforma do strumieniowego przesyłania danych między platformą Apache Kafka a innymi systemami w sposób skalowalny i niezawodny. Ponieważ platforma Kafka Connect obsługuje gotowe łączniki, w tym rozwiązanie Cassandra, nie trzeba pisać niestandardowego kodu w celu zintegrowania platformy Kafka z usługą Azure Cosmos DB dla bazy danych Apache Cassandra.

W tym artykule użyjemy łącznika datastax typu open source platformy Apache Kafka, który działa na platformie Kafka Connect w celu pozyskiwania rekordów z tematu platformy Kafka do wierszy co najmniej jednej tabeli cassandra. W przykładzie przedstawiono konfigurację wielokrotnego użytku przy użyciu narzędzia Docker Compose. Jest to dość wygodne, ponieważ umożliwia uruchamianie wszystkich wymaganych składników lokalnie za pomocą jednego polecenia. Te składniki obejmują platformę Kafka, zookeeper, proces roboczy platformy Kafka Connect i przykładową aplikację generatora danych.

Poniżej przedstawiono podział składników i ich definicji usług — możesz odwołać się do kompletnego docker-compose pliku w repozytorium GitHub.

  • Platformy Kafka i zookeeper używają obrazów debezium .
  • Aby uruchomić go jako kontener platformy Docker, łącznik Platformy Apache Kafka DataStax jest pieczony na podstawie istniejącego obrazu platformy Docker — debezium/connect-base. Ten obraz zawiera instalację platformy Kafka i bibliotek Platformy Kafka Connect, dzięki czemu jest to naprawdę wygodne w dodawaniu łączników niestandardowych. Możesz odwołać się do pliku Dockerfile.
  • Nasiona data-generator usługi generowane losowo (JSON) dane do tematu platformy weather-data Kafka. Możesz odwołać się do kodu i Dockerfile w repozytorium GitHub

Wymagania wstępne

Tworzenie przestrzeni kluczy, tabel i uruchamianie potoku integracji

Korzystając z Azure Portal, utwórz przestrzeń kluczy Cassandra i tabele wymagane dla aplikacji demonstracyjnej.

Uwaga

Użyj tej samej przestrzeni kluczy i nazw tabel, jak poniżej

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Sklonuj repozytorium GitHub:

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

Uruchom wszystkie usługi:

docker-compose --project-name kafka-cosmos-cassandra up --build

Uwaga

Pobranie i uruchomienie kontenerów może zająć trochę czasu: jest to proces jednorazowy.

Aby sprawdzić, czy wszystkie kontenery zostały uruchomione:

docker-compose -p kafka-cosmos-cassandra ps

Aplikacja generatora danych rozpocznie pompowanie danych do tematu weather-data na platformie Kafka. Możesz również wykonać szybką kontrolę sanity, aby potwierdzić. Zobacz kontener platformy Docker z uruchomionym procesem roboczym platformy Kafka Connect:

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

Po usunięciu do powłoki kontenera wystarczy rozpocząć zwykły proces użytkownika konsoli platformy Kafka, w którym powinny być widoczne dane pogodowe (w formacie JSON).

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Konfiguracja łącznika ujścia systemu Cassandra

Skopiuj zawartość JSON poniżej do pliku (możesz go cassandra-sink-config.jsonnazwać ). Należy je zaktualizować zgodnie z konfiguracją, a pozostała część tej sekcji zawiera wskazówki dotyczące tego tematu.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000
    }
}

Oto podsumowanie atrybutów:

Podstawowa łączność

  • contactPoints: wprowadź punkt kontaktowy dla usługi Azure Cosmos DB Cassandra
  • loadBalancing.localDc: wprowadź region dla konta usługi Azure Cosmos DB, np. Azja Południowo-Wschodnia
  • auth.username: wprowadź nazwę użytkownika
  • auth.password: wprowadź hasło
  • port: wprowadź wartość portu (jest to 10350, a nie 9042. Pozostaw ją tak, jak jest)

Konfiguracja protokołu SSL

Usługa Azure Cosmos DB wymusza bezpieczną łączność za pośrednictwem protokołu SSL i łącznika Kafka Connect obsługuje również protokół SSL.

  • ssl.keystore.path: ścieżka do magazynu kluczy JDK w kontenerze — /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password: hasło magazynu kluczy zestawu JDK (domyślne)
  • ssl.hostnameValidation: Włączamy walidację nazwy hosta węzła
  • ssl.provider: JDK jest używany jako dostawca SSL

Parametry ogólne

  • key.converter: Używamy konwertera ciągów org.apache.kafka.connect.storage.StringConverter
  • value.converter: ponieważ dane w tematach platformy Kafka to JSON, wykorzystujemy org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable: Ponieważ ładunek JSON nie ma skojarzonego z nim schematu (na potrzeby aplikacji demonstracyjnej), musimy poinstruować platformę Kafka Connect, aby nie szukała schematu, ustawiając ten atrybut na false. W przeciwnym razie wystąpią błędy.

Instalowanie łącznika

Zainstaluj łącznik przy użyciu punktu końcowego REST programu Kafka Connect:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Aby sprawdzić stan:

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

Jeśli wszystko pójdzie dobrze, łącznik powinien rozpocząć tkanie jego magii. Powinna ona uwierzytelniać się w usłudze Azure Cosmos DB i rozpocząć pozyskiwanie danych z tematu platformy Kafka (weather-data) do tabel Cassandra — weather.data_by_state i weather.data_by_station

Teraz możesz wykonywać zapytania o dane w tabelach. Przejdź do Azure Portal, aby wyświetlić hostowaną powłokę CQL dla konta usługi Azure Cosmos DB.

Otwórz protokół CQLSH

Wykonywanie zapytań dotyczących danych z usługi Azure Cosmos DB

Sprawdź tabele data_by_state i data_by_station . Oto kilka przykładowych zapytań, które ułatwiają rozpoczęcie pracy:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

Czyszczenie zasobów

Po zakończeniu pracy z aplikacją i kontem usługi Azure Cosmos DB możesz usunąć utworzone zasoby platformy Azure, aby nie ponosić dodatkowych opłat. Aby usunąć zasoby:

  1. Na pasku wyszukiwania Azure Portal wyszukaj i wybierz pozycję Grupy zasobów.

  2. Z listy wybierz grupę zasobów utworzoną na potrzeby tego przewodnika Szybki start.

    Wybierz grupę zasobów do usunięcia

  3. Na stronie Przegląd grupy zasobów wybierz pozycję Usuń grupę zasobów.

    Usuwanie grupy zasobów

  4. W następnym oknie wprowadź nazwę grupy zasobów do usunięcia, a następnie wybierz pozycję Usuń.

Następne kroki