Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Область применения:
Кассандра
Существующие приложения Cassandra могут легко работать с Azure Cosmos DB для Apache Cassandra из-за совместимости драйвера CQLv4. Эту возможность можно использовать для интеграции с платформами потоковой передачи, такими как Apache Kafka , и передачи данных в Azure Cosmos DB.
Данные в Apache Kafka (разделы) полезны только при использовании другими приложениями или приеме в других системах. API-интерфейсы производителя и получателя Kafka позволяют создать решение с помощью выбранного языка и клиентского пакета SDK. Kafka Connect предоставляет альтернативное решение. Это платформа для надежной потоковой передачи данных между Apache Kafka и другими системами с возможностью масштабирования. Поскольку Kafka Connect поддерживает готовые соединители, включая Cassandra, вам не нужно писать пользовательский код для интеграции Kafka с Azure Cosmos DB для Apache Cassandra.
В этой статье используется соединитель DataStax Apache Kafka с открытым исходным кодом, который работает поверх платформы Kafka Connect для приема записей из топика Kafka в строки таблиц Cassandra. В примере приведена настройка с помощью Docker Compose, которая поддерживает многократную установку. В этом примере вы можете загрузить все необходимые компоненты локально с помощью одной команды. К таким компонентам относятся Kafka, Zookeeper, рабочая роль Kafka Connect и пример приложения-генератора данных.
Ниже приведена разбивка компонентов и их определений служб. Ознакомьтесь с полным docker-compose
файлом в репозитории GitHub.
- Kafka и Zookeeper используют образы debezium.
- Для запуска в качестве контейнера Docker соединитель DataStax Apache Kafka включен поверх существующего образа Docker: debezium/connect-base. Это изображение включает установку Kafka и его библиотек Kafka Connect, что делает удобным добавление пользовательских соединителей. Обратитесь к Dockerfile.
- Служба
data-generator
заполняет раздел Kafkaweather-data
случайным образом сгенерированными данными (JSON). Ознакомьтесь с кодом иDockerfile
в репозитории GitHub.
Необходимые компоненты
- Подготовка учетной записи Azure Cosmos DB для Apache Cassandra
- Используйте cqlsh для проверки.
- Установка Docker и Docker Compose
Создание пространства ключей и таблиц и запуск конвейера интеграции
С помощью портал Azure создайте пространство ключей и таблицы Cassandra, необходимые для демонстрационного приложения.
Примечание.
Используйте те же Keyspace и имена таблиц, что и здесь.
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Клонируйте репозиторий GitHub.
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
Запустите все службы.
docker-compose --project-name kafka-cosmos-cassandra up --build
Примечание.
Может потребоваться некоторое время, чтобы скачать и запустить контейнеры. Эта настройка выполняется только один раз.
Чтобы проверить, запущены ли все контейнеры, выполните следующие действия.
docker-compose -p kafka-cosmos-cassandra ps
Приложение генератора данных начинает перекачивать данные в weather-data
раздел в Kafka. Вы также можете выполнить быструю проверку, чтобы подтвердить. Просмотрите контейнер Docker, выполняющий рабочую роль Kafka Connect.
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
После падения в оболочку контейнера запустите обычный процесс потребителя консоли Kafka. Вы должны увидеть, как данные о погоде поступают в формате JSON.
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Установка соединителя приемника Cassandra
Скопируйте содержимое JSON в файл. Назовите его cassandra-sink-config.json
. Вам нужно обновить его в соответствии с вашей конфигурацией. Остальная часть этого раздела содержит рекомендации.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
Ниже приведена сводка атрибутов:
Основные возможности подключения
-
contactPoints
: введите контактную точку для Azure Cosmos DB Cassandra -
loadBalancing.localDc
: введите регион для учетной записи Azure Cosmos DB, например юго-восточная Азия -
auth.username
: введите имя пользователя -
auth.password
: введите пароль -
port
: введите значение порта. Это значение10350
, а не9042
. оставьте его как есть
Настройка SSL
Azure Cosmos DB обеспечивает безопасное подключение через SSL, а соединитель Kafka Connect также поддерживает SSL.
-
ssl.keystore.path
: Путь к хранилищу ключей JDK в контейнере —/etc/alternatives/jre/lib/security/cacerts/
-
ssl.keystore.password
: пароль хранилища ключей JDK (по умолчанию). -
ssl.hostnameValidation
: включается собственная проверка имени узла. -
ssl.provider
:JDK
используется в качестве поставщика SSL.
Универсальные параметры
-
key.converter
: используется преобразователь строкorg.apache.kafka.connect.storage.StringConverter
. -
value.converter
: так как данные в разделах Kafka — JSON, мы используемorg.apache.kafka.connect.json.JsonConverter
-
value.converter.schemas.enable
: Так как наш JSON не имеет ассоциированной схемы для демонстрационного приложения, нам нужно указать Kafka Connect не искать схему, задав этому атрибуту значениеfalse
. Это не приводит к сбоям.
Установка соединителя
Установите соединитель, используя конечную точку REST для Kafka Connect.
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Для проверки состояния выполните следующие действия.
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
Если все работает правильно, соединитель должен начать выполнять свои функции. Он должен пройти проверку подлинности в Azure Cosmos DB и начать загрузку данных из топика Kafka (weather-data
) в таблицы Cassandra: weather.data_by_state
и weather.data_by_station
.
Теперь можно запрашивать данные в таблицах. На портале Azure запустите размещенный интерфейс CQL для учетной записи Azure Cosmos DB.
Запрос данных из Azure Cosmos DB
Проверьте таблицы data_by_state
и data_by_station
. Ниже приведены примеры запросов для начала работы:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
Очистка ресурсов
После завершения работы с приложением и учетной записью Azure Cosmos DB можно удалить созданные ресурсы Azure, чтобы избежать дополнительных расходов. Удаление ресурсов:
На панели поиска портала Azure найдите и выберите Группы ресурсов.
Выберите из списка группу ресурсов, созданную для этого краткого руководства.
На странице обзора группы ресурсов выберите Удалить группу ресурсов.
В следующем окне введите имя группы ресурсов, которую требуется удалить, и щелкните Удалить.