Movimiento de datos mediante un conector de Kafka

Completado

Apache Kafka es una plataforma de código abierto que se usa para transmitir eventos de forma distribuida. Muchas empresas usan Kafka para escenarios de integración de datos de alto rendimiento a gran escala. Kafka Connect es una herramienta dentro de su conjunto para transmitir datos entre Kafka y otros sistemas de datos. Es comprensible que esto pueda incluir Azure Cosmos DB como origen de datos o un destino (receptor) de datos.

Configurar

Los conectores de Kafka Connect para Azure Cosmos DB están disponibles como un proyecto de código abierto en GitHub en microsoft/kafka-connect-cosmosdb. Las instrucciones para descargar e instalar manualmente el archivo JAR están disponibles en el repositorio.

Configuración

Se deben establecer cuatro propiedades de configuración para configurar correctamente la conectividad con una cuenta de Azure Cosmos DB for NoSQL.

Propiedad Valor
connect.cosmos.connection.endpoint URI del punto de conexión de la cuenta
connect.cosmos.master.key Clave de cuenta
connect.cosmos.databasename Nombre del recurso de la base de datos
connect.cosmos.containers.topicmap Mediante el formato CSV, una asignación de los temas de Kafka a los contenedores

Asignación de temas a contenedores

Cada contenedor debe asignarse a un tema. Por ejemplo, supongamos que desea que el contenedor products se asigne al tema prodlistener y el contenedor customers al tema custlistener. En ese caso, debe usar la siguiente cadena de asignación CSV: prodlistener#products,custlistener#customers.

Escritura a Azure Cosmos DB

Vamos a escribir datos en Azure Cosmos DB mediante la creación de un tema. En Apache Kafka, todos los mensajes se envían mediante temas.

Puede crear un nuevo tema mediante el comando kafka-topics. En este ejemplo se crea un nuevo tema denominado prodlistener.

kafka-topics --create \
    --zookeeper localhost:2181 \
    --topic prodlistener \
    --replication-factor 1 \
    --partitions 1

El siguiente comando iniciará un productor, por lo que podrá escribir tres registros en el tema prodlistener.

kafka-console-producer \
    --broker-list localhost:9092 \
    --topic prodlistener

Y en la consola, puede escribir estos tres registros en el tema. Una vez hecho esto, estos registros se confirman en el contenedor de Azure Cosmos DB for NoSQL asignado al tema (products).

{"id": "0ac8b014-c3f4-4db0-8a1f-434bab460938", "name": "handlebar", "categoryId": "78148556-4e84-44be-abae-9755dde9c9e3"}
{"id": "54ba00da-50cf-44d8-b122-1d18bd1db400", "name": "handlebar", "categoryId": "eb642a5e-0c6f-4c83-b96b-bb2903b85e59"}
{"id": "381dde84-e6c2-4583-b66c-e4a4116f7d6e", "name": "handlebar", "categoryId": "cf8ae707-6d74-4563-831a-06e15a70a0dc"}

Lectura desde Azure Cosmos DB

Puede crear un conector de origen en Kafka Connect con un objeto de configuración JSON. En esta configuración de ejemplo siguiente, la mayoría de las propiedades se deben dejar sin cambios, pero asegúrese de cambiar los valores siguientes:

Propiedad Descripción
connect.cosmos.connection.endpoint El URI de punto de conexión de la cuenta real
connect.cosmos.master.key El clave de la cuenta real
connect.cosmos.databasename El nombre del recurso de la base de datos de la cuenta real
connect.cosmos.containers.topicmap Mediante el formato CSV, una asignación de los temas de Kafka reales a los contenedores
{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connect.cosmos.task.poll.interval": "100",
    "connect.cosmos.connection.endpoint": "<cosmos-endpoint>",
    "connect.cosmos.master.key": "<cosmos-key>",
    "connect.cosmos.databasename": "<cosmos-database>",
    "connect.cosmos.containers.topicmap": "<kafka-topic>#<cosmos-container>",
    "connect.cosmos.offset.useLatest": false,
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false"
  }
}

Como ejemplo ilustrativo, use esta tabla de configuración de ejemplo:

Propiedad Descripción
connect.cosmos.connection.endpoint https://dp420.documents.azure.com:443/
connect.cosmos.master.key C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
connect.cosmos.databasename cosmicworks
connect.cosmos.containers.topicmap prodlistener#products

Este es un archivo de configuración de ejemplo:

{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connect.cosmos.task.poll.interval": "100",
    "connect.cosmos.connection.endpoint": "https://dp420.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "cosmicworks",
    "connect.cosmos.containers.topicmap": "prodlistener#products",
    "connect.cosmos.offset.useLatest": false,
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false"
  }
}

Una vez configurados, los datos de la fuente de cambios de Azure Cosmos DB se publicarán en un tema de Kafka.