Поделиться через


Прием данных из Apache Kafka в Azure Cosmos DB для Apache Cassandra с помощью Kafka Connect

Область применения: Кассандра

Существующие приложения Cassandra могут легко работать с Azure Cosmos DB для Apache Cassandra из-за совместимости драйвера CQLv4. Эту возможность можно использовать для интеграции с платформами потоковой передачи, такими как Apache Kafka , и передачи данных в Azure Cosmos DB.

Данные в Apache Kafka (разделы) полезны только при использовании другими приложениями или приеме в других системах. API-интерфейсы производителя и получателя Kafka позволяют создать решение с помощью выбранного языка и клиентского пакета SDK. Kafka Connect предоставляет альтернативное решение. Это платформа для надежной потоковой передачи данных между Apache Kafka и другими системами с возможностью масштабирования. Поскольку Kafka Connect поддерживает готовые соединители, включая Cassandra, вам не нужно писать пользовательский код для интеграции Kafka с Azure Cosmos DB для Apache Cassandra.

В этой статье используется соединитель DataStax Apache Kafka с открытым исходным кодом, который работает поверх платформы Kafka Connect для приема записей из топика Kafka в строки таблиц Cassandra. В примере приведена настройка с помощью Docker Compose, которая поддерживает многократную установку. В этом примере вы можете загрузить все необходимые компоненты локально с помощью одной команды. К таким компонентам относятся Kafka, Zookeeper, рабочая роль Kafka Connect и пример приложения-генератора данных.

Ниже приведена разбивка компонентов и их определений служб. Ознакомьтесь с полным docker-composeфайлом в репозитории GitHub.

  • Kafka и Zookeeper используют образы debezium.
  • Для запуска в качестве контейнера Docker соединитель DataStax Apache Kafka включен поверх существующего образа Docker: debezium/connect-base. Это изображение включает установку Kafka и его библиотек Kafka Connect, что делает удобным добавление пользовательских соединителей. Обратитесь к Dockerfile.
  • Служба data-generator заполняет раздел Kafka weather-data случайным образом сгенерированными данными (JSON). Ознакомьтесь с кодом и Dockerfile в репозитории GitHub.

Необходимые компоненты

Создание пространства ключей и таблиц и запуск конвейера интеграции

С помощью портал Azure создайте пространство ключей и таблицы Cassandra, необходимые для демонстрационного приложения.

Примечание.

Используйте те же Keyspace и имена таблиц, что и здесь.

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Клонируйте репозиторий GitHub.

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

Запустите все службы.

docker-compose --project-name kafka-cosmos-cassandra up --build

Примечание.

Может потребоваться некоторое время, чтобы скачать и запустить контейнеры. Эта настройка выполняется только один раз.

Чтобы проверить, запущены ли все контейнеры, выполните следующие действия.

docker-compose -p kafka-cosmos-cassandra ps

Приложение генератора данных начинает перекачивать данные в weather-data раздел в Kafka. Вы также можете выполнить быструю проверку, чтобы подтвердить. Просмотрите контейнер Docker, выполняющий рабочую роль Kafka Connect.

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

После падения в оболочку контейнера запустите обычный процесс потребителя консоли Kafka. Вы должны увидеть, как данные о погоде поступают в формате JSON.

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Установка соединителя приемника Cassandra

Скопируйте содержимое JSON в файл. Назовите его cassandra-sink-config.json. Вам нужно обновить его в соответствии с вашей конфигурацией. Остальная часть этого раздела содержит рекомендации.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000
    }
}

Ниже приведена сводка атрибутов:

Основные возможности подключения

  • contactPoints: введите контактную точку для Azure Cosmos DB Cassandra
  • loadBalancing.localDc: введите регион для учетной записи Azure Cosmos DB, например юго-восточная Азия
  • auth.username: введите имя пользователя
  • auth.password: введите пароль
  • port: введите значение порта. Это значение 10350, а не 9042. оставьте его как есть

Настройка SSL

Azure Cosmos DB обеспечивает безопасное подключение через SSL, а соединитель Kafka Connect также поддерживает SSL.

  • ssl.keystore.path: Путь к хранилищу ключей JDK в контейнере — /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password: пароль хранилища ключей JDK (по умолчанию).
  • ssl.hostnameValidation: включается собственная проверка имени узла.
  • ssl.provider: JDK используется в качестве поставщика SSL.

Универсальные параметры

  • key.converter: используется преобразователь строк org.apache.kafka.connect.storage.StringConverter.
  • value.converter: так как данные в разделах Kafka — JSON, мы используем org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable: Так как наш JSON не имеет ассоциированной схемы для демонстрационного приложения, нам нужно указать Kafka Connect не искать схему, задав этому атрибуту значение false. Это не приводит к сбоям.

Установка соединителя

Установите соединитель, используя конечную точку REST для Kafka Connect.

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Для проверки состояния выполните следующие действия.

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

Если все работает правильно, соединитель должен начать выполнять свои функции. Он должен пройти проверку подлинности в Azure Cosmos DB и начать загрузку данных из топика Kafka (weather-data) в таблицы Cassandra: weather.data_by_state и weather.data_by_station.

Теперь можно запрашивать данные в таблицах. На портале Azure запустите размещенный интерфейс CQL для учетной записи Azure Cosmos DB.

Снимок экрана: портал Azure с выделенным параметром Open Cassandra Shell.

Запрос данных из Azure Cosmos DB

Проверьте таблицы data_by_state и data_by_station. Ниже приведены примеры запросов для начала работы:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

Очистка ресурсов

После завершения работы с приложением и учетной записью Azure Cosmos DB можно удалить созданные ресурсы Azure, чтобы избежать дополнительных расходов. Удаление ресурсов:

  1. На панели поиска портала Azure найдите и выберите Группы ресурсов.

  2. Выберите из списка группу ресурсов, созданную для этого краткого руководства.

    Выбор группы ресурсов для удаления

  3. На странице обзора группы ресурсов выберите Удалить группу ресурсов.

    Удаление группы ресурсов

  4. В следующем окне введите имя группы ресурсов, которую требуется удалить, и щелкните Удалить.

Следующие шаги