Kafka Connect kullanarak Apache Kafka'dan Apache Cassandra için Azure Cosmos DB'ye veri alma

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Mevcut Cassandra uygulamaları, CQLv4 sürücü uyumluluğu nedeniyle Apache Cassandra için Azure Cosmos DB ile kolayca çalışabilir. Bu özellik, Apache Kafka gibi akış platformlarıyla tümleştirmek ve verileri Azure Cosmos DB'ye getirmek için kullanılır.

Apache Kafka'daki veriler (konular) yalnızca diğer uygulamalar tarafından kullanıldığında veya başka sistemlere alındığında yararlıdır. Seçtiğiniz dil ve istemci SDK'sını kullanarakKafka Üretici/Tüketici API'lerini kullanarak bir çözüm oluşturabilirsiniz. Kafka Connect alternatif bir çözüm sağlar. Apache Kafka ile diğer sistemler arasında ölçeklenebilir ve güvenilir bir şekilde veri akışı yapmak için bir platformdur. Kafka Connect, Cassandra içeren raf bağlayıcılarını desteklediğinden, Kafka'yı Apache Cassandra için Azure Cosmos DB ile tümleştirmek için özel kod yazmanız gerekmez.

Bu makalede, kafka konu başlığındaki kayıtları bir veya daha fazla Cassandra tablosunun satırlarına almak için Kafka Connect çerçevesinin üstünde çalışan açık kaynak DataStax Apache Kafka bağlayıcısını kullanacağız. Örnek, Docker Compose kullanarak yeniden kullanılabilir bir kurulum sağlar. Tek bir komutla tüm gerekli bileşenleri yerel olarak önyüklemenize olanak sağladığından bu oldukça kullanışlıdır. Bu bileşenler Kafka, Zookeeper, Kafka Connect çalışanı ve örnek veri oluşturucu uygulamasıdır.

Bileşenlerin ve hizmet tanımlarının dökümünü burada bulabilirsiniz. GitHub deposundaki dosyanın tamamınadocker-compose bakabilirsiniz.

  • Kafka ve Zookeeper debezium görüntüleri kullanır.
  • Docker kapsayıcısı olarak çalıştırmak için DataStax Apache Kafka Bağlayıcısı mevcut Docker görüntüsünün ( debezium/connect-base) üzerinde pişirilir. Bu görüntü, Kafka'nın ve Kafka Connect kitaplıklarının bir yüklemesini içerir, bu nedenle özel bağlayıcılar eklemeyi gerçekten kullanışlı hale getirir. Dockerfile dosyasına başvurabilirsiniz.
  • Hizmet, data-generator rastgele oluşturulan (JSON) verileri Kafka konusuna weather-data dağıtır. Koda ve DockerfileGitHub deposundan başvurabilirsiniz

Önkoşullar

Keyspace, tablolar oluşturma ve tümleştirme işlem hattını başlatma

Azure portal kullanarak Cassandra Keyspace'i ve tanıtım uygulaması için gereken tabloları oluşturun.

Not

Aşağıdakiyle aynı Keyspace ve tablo adlarını kullanın

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

GitHub deposunu kopyalayın:

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

Tüm hizmetleri başlatın:

docker-compose --project-name kafka-cosmos-cassandra up --build

Not

Kapsayıcıları indirip başlatmak biraz zaman alabilir: Bu yalnızca bir kerelik bir işlemdir.

Tüm kapsayıcıların başlatılıp başlatılmadığını onaylamak için:

docker-compose -p kafka-cosmos-cassandra ps

Veri oluşturucu uygulaması Kafka'da konuya veri weather-data pompalamaya başlar. Onaylamak için hızlı akıl sağlığı denetimi de yapabilirsiniz. Kafka connect çalışanını çalıştıran Docker kapsayıcısına göz atın:

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

Kapsayıcı kabuğuna girdikten sonra her zamanki Kafka konsolu tüketici işlemini başlatmanız ve hava durumu verilerinin (JSON biçiminde) aktığını görmeniz gerekir.

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Cassandra Havuz bağlayıcısı kurulumu

Aşağıdaki JSON içeriğini bir dosyaya kopyalayın (adını cassandra-sink-config.jsonverebilirsiniz). Kurulumunuza göre güncelleştirmeniz gerekir ve bu bölümün geri kalanında bu konu hakkında yönergeler sağlanır.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000
    }
}

Özniteliklerin özeti aşağıdadır:

Temel bağlantı

  • contactPoints: Azure Cosmos DB Cassandra için iletişim noktasını girin
  • loadBalancing.localDc: Azure Cosmos DB hesabının bölgesini girin; örneğin Güneydoğu Asya
  • auth.username: kullanıcı adını girin
  • auth.password: parolayı girin
  • port: bağlantı noktası değerini girin (bu 10350, değil 9042. olduğu gibi bırakın)

SSL yapılandırması

Azure Cosmos DB SSL üzerinden güvenli bağlantı uygular ve Kafka Connect bağlayıcısı da SSL'yi destekler.

  • ssl.keystore.path: kapsayıcıdaki JDK anahtar deposunun yolu - /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password: JDK anahtar deposu (varsayılan) parolası
  • ssl.hostnameValidation: Kendi düğüm konak adı doğrulamayı açarız
  • ssl.provider: JDK SSL sağlayıcısı olarak kullanılır

Genel parametreler

  • key.converter: Dize dönüştürücüsü kullanılır org.apache.kafka.connect.storage.StringConverter
  • value.converter: Kafka konu başlıklarındaki veriler JSON olduğundan org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable: JSON yükümüzün kendisiyle ilişkilendirilmiş bir şeması olmadığından (tanıtım uygulamasının amaçları doğrultusunda) Kafka Connect'e bu özniteliği falseolarak ayarlayarak şema aramamasını söylememiz gerekir. Bunu yapmamak hatalara neden olur.

Bağlayıcıyı yükleme

Kafka Connect REST uç noktasını kullanarak bağlayıcıyı yükleyin:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Durumu denetlemek için:

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

Her şey yolunda giderse bağlayıcı sihrini örmeye başlamalıdır. Azure Cosmos DB'de kimlik doğrulaması yapmalı ve Kafka konusundan (weather-data) Cassandra tablolarına veri almaya başlamalıdır - weather.data_by_state ve weather.data_by_station

Artık tablolardaki verileri sorgulayabilirsiniz. Azure portal gidin ve Azure Cosmos DB hesabınız için barındırılan CQL Kabuğunu açın.

CQLSH'yi açma

Azure Cosmos DB'den veri sorgulama

ve data_by_station tablolarını data_by_state denetleyin. Başlamanıza neden olacak bazı örnek sorgular aşağıda verilmiştir:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

Kaynakları temizleme

Uygulamanız ve Azure Cosmos DB hesabınızla işiniz bittiğinde, daha fazla ücret ödemeden oluşturduğunuz Azure kaynaklarını silebilirsiniz. Kaynakları silmek için:

  1. Azure portal Arama çubuğunda Kaynak grupları'nı arayın ve seçin.

  2. Listeden bu hızlı başlangıç için oluşturduğunuz kaynak grubunu seçin.

    Silinecek kaynak grubunu seçin

  3. Kaynak grubuna Genel Bakış sayfasında Kaynak grubunu sil'i seçin.

    Kaynak grubunu silme

  4. Sonraki pencerede, silinecek kaynak grubunun adını girin ve sil'i seçin.

Sonraki adımlar