Compartir a través de


Ingesta de datos de Apache Kafka en Azure Cosmos DB for Apache Cassandra mediante Kafka Connect

SE APLICA A: Casandra

Las aplicaciones de Cassandra existentes pueden funcionar fácilmente con Azure Cosmos DB for Apache Cassandra debido a su compatibilidad con el controlador CQLv4. Puede usar esta capacidad para integrarse con plataformas de streaming, como Apache Kafka, y poner los datos en Azure Cosmos DB.

Los datos de Apache Kafka (temas) solo son útiles cuando se utilizan en otras aplicaciones o se ingieren en otros sistemas. Es posible compilar una solución con las API de productor o consumidor de Kafkamediante un SDK de cliente e idioma de su elección. Kafka Connect proporciona una solución alternativa. Es una plataforma para transmitir datos entre Apache Kafka y otros sistemas de forma escalable y confiable. Dado que Kafka Connect admite conectores de estante, que incluyen Cassandra, no es necesario escribir código personalizado para integrar Kafka con Azure Cosmos DB para Apache Cassandra.

En este artículo se usa el conector dataStax Apache Kafka de código abierto que funciona sobre el marco de Kafka Connect para ingerir registros de un tema de Kafka en filas de tablas de Cassandra. En el ejemplo se proporciona una configuración reutilizable mediante Docker Compose. Este ejemplo le permite arrancar todos los componentes necesarios localmente con un solo comando. Estos componentes incluyen Kafka, Zookeeper, trabajo de Kafka Connect y la aplicación del generador de datos de ejemplo.

Este es un desglose de los componentes y sus definiciones de servicio. Consulte el archivo completo docker-composeen el repositorio de GitHub.

  • Kafka y Zookeeper usan imágenes de debezium.
  • Para ejecutarse como un contenedor de Docker, el conector de Apache Kafka de DataStax se incluye sobre una imagen de Docker existente: debezium/connect-base. Esta imagen incluye una instalación de Kafka y sus bibliotecas de Kafka Connect, lo que facilita la adición de conectores personalizados. Consulte dockerfile.
  • El servicio data-generator proporciona los datos generados de forma aleatoria (JSON) en el tema weather-data de Kafka. Consulte el código y Dockerfile en el repositorio de GitHub.

Requisitos previos

Creación del espacio de claves y las tablas e inicio de la canalización de integración

Mediante Azure Portal, cree el espacio de claves de Cassandra y las tablas necesarias para la aplicación de demostración.

Nota:

Utiliza los mismos nombres de espacio de claves y de tablas que se utilizan aquí.

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Clone el repositorio de GitHub:

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

Inicie todos los servicios:

docker-compose --project-name kafka-cosmos-cassandra up --build

Nota:

Es posible que tarde un tiempo en descargar e iniciar los contenedores. Esta configuración es solo un proceso de una sola vez.

Para confirmar si todos los contenedores se iniciaron:

docker-compose -p kafka-cosmos-cassandra ps

La aplicación del generador de datos comienza a bombear los datos en el tema weather-data de Kafka. También puede realizar una comprobación rápida para confirmar. Eche un vistazo al contenedor de Docker que ejecuta el trabajo de Kafka Connect:

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

Después de colocar en el shell de contenedor, inicie el proceso habitual de consumidor de la consola de Kafka. Debería ver los datos meteorológicos en formato JSON que fluyen.

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Configuración del conector del receptor de Cassandra

Copie el contenido JSON aquí en un archivo. Asígnalo cassandra-sink-config.json. Debe actualizarlo según su configuración. El resto de esta sección proporciona instrucciones.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000
    }
}

A continuación se muestra un resumen de los atributos:

Conectividad básica

  • contactPoints: escriba el punto de contacto para Cassandra de Azure Cosmos DB
  • loadBalancing.localDc: escriba la región de la cuenta de Azure Cosmos DB, como Sudeste asiático.
  • auth.username: escriba el nombre de usuario.
  • auth.password: escriba la contraseña.
  • port: escriba el valor del puerto. Este valor es 10350, no 9042. déjelo tal como está

Configuración de SSL

Azure Cosmos DB exige conectividad segura a través de SSL y el conector de Kafka Connect también admite SSL.

  • ssl.keystore.path: ruta de acceso al almacén de claves JDK en el contenedor: /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password: contraseña del almacén de claves de JDK (predeterminada).
  • ssl.hostnameValidation: se activa la validación del nombre de host del nodo.
  • ssl.provider: JDK se utiliza como proveedor de SSL.

Parámetros genéricos

  • key.converter: Usamos el convertidor de cadenas org.apache.kafka.connect.storage.StringConverter.
  • value.converter: dado que los datos de los temas de Kafka son JSON, usamos org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable: dado que nuestra carga JSON no tiene un esquema asociado para los fines de la aplicación de demostración, es necesario indicar a Kafka Connect que no busque un esquema estableciendo este atributo falseen . Si no lo hace, se producirán errores.

Instalación del conector

Instale el conector mediante el punto de conexión REST de Kafka Connect:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Para comprobar el estado:

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

Si todo va bien, el conector debería empezar a funcionar. Debe autenticarse en Azure Cosmos DB e iniciar la ingesta de datos del tema de Kafka (weather-data) en tablas de Cassandra: weather.data_by_state y weather.data_by_station.

Ahora puede consultar los datos de las tablas. En Azure Portal, prepare el shell de CQL hospedado para la cuenta de Azure Cosmos DB.

Captura de pantalla que muestra Azure Portal con la opción Abrir cassandra Shell resaltada.

Consulta de datos de Azure Cosmos DB

Compruebe las tablas data_by_state y data_by_station. Estas son algunas consultas de ejemplo para comenzar:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

Limpieza de recursos

Cuando haya terminado tanto con la aplicación como con la cuenta de Azure Cosmos DB, puede eliminar los recursos de Azure que creó para no tener más gastos. Para eliminar los recursos:

  1. En la barra de búsqueda de Azure Portal, busque y seleccione Grupos de recursos.

  2. En la lista, seleccione el grupo de recursos que creó para este inicio rápido.

    Selección del grupo de recursos que se eliminará

  3. En la página Información general del grupo de recursos, seleccione Eliminar grupo de recursos.

    Eliminar el grupo de recursos

  4. En la ventana siguiente, escriba el nombre del grupo de recursos que desea eliminar y, después, seleccione Eliminar.

Pasos siguientes