Movimiento de datos mediante un conector de Kafka
Apache Kafka es una plataforma de código abierto que se usa para transmitir eventos de forma distribuida. Muchas empresas usan Kafka para escenarios de integración de datos de alto rendimiento a gran escala. Kafka Connect es una herramienta dentro de su conjunto para transmitir datos entre Kafka y otros sistemas de datos. Es comprensible que esto pueda incluir Azure Cosmos DB como origen de datos o un destino (receptor) de datos.
Configurar
Los conectores de Kafka Connect para Azure Cosmos DB están disponibles como un proyecto de código abierto en GitHub en microsoft/kafka-connect-cosmosdb. Las instrucciones para descargar e instalar manualmente el archivo JAR están disponibles en el repositorio.
Configuración
Se deben establecer cuatro propiedades de configuración para configurar correctamente la conectividad con una cuenta de Azure Cosmos DB for NoSQL.
Propiedad | Valor |
---|---|
connect.cosmos.connection.endpoint | URI del punto de conexión de la cuenta |
connect.cosmos.master.key | Clave de cuenta |
connect.cosmos.databasename | Nombre del recurso de la base de datos |
connect.cosmos.containers.topicmap | Mediante el formato CSV, una asignación de los temas de Kafka a los contenedores |
Asignación de temas a contenedores
Cada contenedor debe asignarse a un tema. Por ejemplo, supongamos que desea que el contenedor products se asigne al tema prodlistener y el contenedor customers al tema custlistener. En ese caso, debe usar la siguiente cadena de asignación CSV: prodlistener#products,custlistener#customers
.
Escritura a Azure Cosmos DB
Vamos a escribir datos en Azure Cosmos DB mediante la creación de un tema. En Apache Kafka, todos los mensajes se envían mediante temas.
Puede crear un nuevo tema mediante el comando kafka-topics. En este ejemplo se crea un nuevo tema denominado prodlistener.
kafka-topics --create \
--zookeeper localhost:2181 \
--topic prodlistener \
--replication-factor 1 \
--partitions 1
El siguiente comando iniciará un productor, por lo que podrá escribir tres registros en el tema prodlistener.
kafka-console-producer \
--broker-list localhost:9092 \
--topic prodlistener
Y en la consola, puede escribir estos tres registros en el tema. Una vez hecho esto, estos registros se confirman en el contenedor de Azure Cosmos DB for NoSQL asignado al tema (products).
{"id": "0ac8b014-c3f4-4db0-8a1f-434bab460938", "name": "handlebar", "categoryId": "78148556-4e84-44be-abae-9755dde9c9e3"}
{"id": "54ba00da-50cf-44d8-b122-1d18bd1db400", "name": "handlebar", "categoryId": "eb642a5e-0c6f-4c83-b96b-bb2903b85e59"}
{"id": "381dde84-e6c2-4583-b66c-e4a4116f7d6e", "name": "handlebar", "categoryId": "cf8ae707-6d74-4563-831a-06e15a70a0dc"}
Lectura desde Azure Cosmos DB
Puede crear un conector de origen en Kafka Connect con un objeto de configuración JSON. En esta configuración de ejemplo siguiente, la mayoría de las propiedades se deben dejar sin cambios, pero asegúrese de cambiar los valores siguientes:
Propiedad | Descripción |
---|---|
connect.cosmos.connection.endpoint | El URI de punto de conexión de la cuenta real |
connect.cosmos.master.key | El clave de la cuenta real |
connect.cosmos.databasename | El nombre del recurso de la base de datos de la cuenta real |
connect.cosmos.containers.topicmap | Mediante el formato CSV, una asignación de los temas de Kafka reales a los contenedores |
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "<cosmos-endpoint>",
"connect.cosmos.master.key": "<cosmos-key>",
"connect.cosmos.databasename": "<cosmos-database>",
"connect.cosmos.containers.topicmap": "<kafka-topic>#<cosmos-container>",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
Como ejemplo ilustrativo, use esta tabla de configuración de ejemplo:
Propiedad | Descripción |
---|---|
connect.cosmos.connection.endpoint | https://dp420.documents.azure.com:443/ |
connect.cosmos.master.key | C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw== |
connect.cosmos.databasename | cosmicworks |
connect.cosmos.containers.topicmap | prodlistener#products |
Este es un archivo de configuración de ejemplo:
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "https://dp420.documents.azure.com:443/",
"connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
"connect.cosmos.databasename": "cosmicworks",
"connect.cosmos.containers.topicmap": "prodlistener#products",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
Una vez configurados, los datos de la fuente de cambios de Azure Cosmos DB se publicarán en un tema de Kafka.