Compartir a través de


Kafka Connect para Azure Cosmos DB: conector receptor

SE APLICA A: NoSQL

Kafka Connect para Azure Cosmos DB es un conector para leer y escribir datos en Azure Cosmos DB. El conector receptor de Azure Cosmos DB permite exportar datos de temas de Apache Kafka a una base de datos de Azure Cosmos DB. El conector sondea los datos de Kafka para escribir en contenedores de la base de datos en función de la suscripción de temas.

Requisitos previos

  • Comience con la configuración de la plataforma de Confluent, ya que le proporciona un entorno completo con el que trabajar. Si no quiere usar la plataforma de Confluent, debe instalar y configurar usted Apache Kafka y Kafka Connect. También deberá instalar y configurar los conectores de Azure Cosmos DB manualmente.
  • Guía de configuración para crear una cuenta y un contenedor de Azure Cosmos DB.
  • Shell de Bash, que se ha probado en GitHub Codespaces, Mac, Ubuntu y Windows con WSL2. Este shell no funciona en Cloud Shell ni WSL1.
  • Descarga de Java 11+
  • Descarga de Maven

Instalación del conector receptor

Si usa la configuración recomendada de la plataforma de Confluent, el conector receptor de Azure Cosmos DB se incluye en la instalación y puede omitir este paso.

De lo contrario, puede descargar el archivo JAR de la última versión o empaquetar este repositorio para crear un archivo JAR. Para instalar el conector manualmente mediante el archivo JAR, consulte estas instrucciones. También puede empaquetar un nuevo archivo JAR desde el código fuente.

# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar

Creación de un tema de Kafka y escritura de datos

Si usa la plataforma Confluent, la manera más fácil de crear un tema de Kafka es mediante la experiencia de usuario del Centro de control proporcionada. De lo contrario, puede crear manualmente un tema de Kafka con la sintaxis siguiente:

./kafka-topics.sh --create --boostrap-server <URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>

En este escenario vamos a crear un tema de Kafka llamado "hotels" y a escribir datos JSON insertados sin esquema en el tema. Para crear un tema en el centro de control, consulte la guía de Confluent.

A continuación, inicie el productor de la consola de Kafka para escribir algunos registros en el tema "hotels".

# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels

En el productor de la consola, escriba lo siguiente:

{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}

Los tres registros especificados se publican en el tema de Kafka "hotels" en formato JSON.

Creación del conector receptor

Cree un conector receptor de Azure Cosmos DB en Kafka Connect. El siguiente cuerpo JSON define la configuración del conector receptor. Asegúrese de reemplazar los valores de connect.cosmos.connection.endpoint y connect.cosmos.master.key, las propiedades que debería haber guardado de la guía de configuración de Azure Cosmos DB en los requisitos previos.

Para más información sobre cada una de estas propiedades de configuración, consulte las propiedades del receptor.

{
  "name": "cosmosdb-sink-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "tasks.max": "1",
    "topics": [
      "hotels"
    ],
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
    "connect.cosmos.master.key": "<cosmosdbprimarykey>",
    "connect.cosmos.databasename": "kafkaconnect",
    "connect.cosmos.containers.topicmap": "hotels#kafka"
  }
}

Una vez que haya rellenado todos los valores, guarde el archivo JSON en algún lugar de forma local. Puede usar este archivo para crear el conector mediante la API de REST.

Creación de un conector mediante el centro de control

Una opción fácil para crear el conector es desde la página web del centro de control. Siga esta guía de instalación para crear un conector desde el centro de control. En lugar de usar la opción DatagenConnector, use el icono CosmosDBSinkConnector. Al configurar el conector receptor, rellene los valores según lo haya hecho en el archivo JSON.

Como alternativa, en la página de los conectores, puede cargar el archivo JSON creado anteriormente mediante la opción Upload connector config file (Cargar archivo de configuración del receptor).

Captura de pantalla de la opción

Creación de un conector mediante la API de REST

Creación del conector receptor mediante la API de REST de Connect:

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

Confirmación de los datos escritos en Azure Cosmos DB

Inicie sesión en Azure Portal y vaya a su cuenta de Azure Cosmos DB. Compruebe que los tres registros del tema "hotels" están creados en su cuenta.

Limpieza

Para eliminar el conector desde el centro de control, vaya al conector receptor que creó y seleccione el icono Eliminar.

Captura de pantalla de la opción de eliminación en el cuadro de diálogo del conector del receptor.

Como alternativa, use la API de REST de Connect para la eliminación:

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector

Para eliminar el servicio Azure Cosmos DB creado y su grupo de recursos mediante la CLI de Azure, consulte estos pasos.

Propiedades de configuración del receptor

La siguiente configuración se usa para configurar el conector receptor de Kafka para Azure Cosmos DB. Estos valores de configuración determinan qué datos de los temas de Kafka se consumen, qué datos del contenedor de Azure Cosmos DB se escriben en ellos y los formatos para serializar los datos. Para obtener un archivo de configuración de ejemplo con los valores predeterminados, consulte esta configuración.

Nombre Escribir Descripción Obligatorio/opcional
Temas lista Una lista de los temas de Kafka que se verán. Requerido
connector.class cuerda / cadena Nombre de clase del receptor de Azure Cosmos DB. Se debe establecer en com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector. Requerido
connect.cosmos.connection.endpoint Uri Cadena del URI del punto de conexión de Azure Cosmos DB. Requerido
connect.cosmos.master.key cuerda / cadena La clave principal de Azure Cosmos DB a la que se conecta el receptor. Requerido
connect.cosmos.nombredelabase_de_datos cuerda / cadena Nombre de la base de datos de Azure Cosmos DB en la que escribe el receptor. Requerido
connect.cosmos.containers.topicmap cuerda / cadena Asignación entre temas de Kafka y contenedores de Azure Cosmos DB, con formato CSV como se muestra: topic#container,topic2#container2. Requerido
connect.cosmos.connection.gateway.enabled boolean Marca para indicar si se va a usar el modo de puerta de enlace. De forma predeterminada, es false. Opcional
connect.cosmos.sink.bulk.enabled boolean Marca para indicar si el modo masivo está habilitado. De forma predeterminada, es "true". Opcional
connect.cosmos.sink.maxRetryCount Int Número máximo de reintentos en errores de escritura transitorios. De forma predeterminada, es 10 veces. Opcional
key.converter cuerda / cadena Formato de serialización para los datos clave escritos en el tema de Kafka. Requerido
Conversor de valores cuerda / cadena Formato de serialización para los datos de valor escritos en el tema de Kafka. Requerido
key.converter.schemas.enable cuerda / cadena Establezca el valor en "true" si los datos de clave tienen un esquema insertado. Opcional
value.converter.schemas.enable cuerda / cadena Establezca el valor en "true" si los datos de clave tienen un esquema insertado. Opcional
tasks.max Int Número máximo de tare as de receptor del conector. Valor predeterminado: 1 Opcional

Los datos siempre se escribirán en Azure Cosmos DB como JSON sin ningún esquema.

Tipos de datos admitidos

El conector receptor de Azure Cosmos DB convierte el registro de receptor en un documento JSON que admite los siguientes tipos de esquema:

Tipo de esquema Tipo de datos JSON
Array Array
Boolean Booleano
Float32 Número
Float64 Número
Int8 Número
Int16 Número
Int32 Número
Int64 Número
Mapa Objeto (JSON)
Cuerda Cuerda
Nulo
Estructura Objeto (JSON)

El conector receptor también admite los siguientes tipos lógicos de AVRO:

Tipo de esquema Tipo de datos JSON
Fecha Número
Hora Número
Marca de tiempo Número

Nota:

Actualmente, la deserialización de bytes no es compatible con el conector receptor de Azure Cosmos DB.

Transformaciones simples de mensajes (SMT)

Junto con la configuración del conector receptor, puede especificar el uso de transformaciones simples de mensajes (SMT) para modificar los mensajes que fluyen a través de la plataforma de Kafka Connect. Para más información, consulte la documentación sobre SMT de Confluent.

Uso de SMT InsertUUID

Puede usar SMT InsertUUID para agregar automáticamente los identificadores de elementos. Con el SMT InsertUUID personalizado, puede insertar el campo id con un valor UUID aleatorio para cada mensaje antes de que se escriba en Azure Cosmos DB.

Advertencia

Use este SMT solo si los mensajes no contienen el campo id. De lo contrario, los valores id se sobrescribirán y es posible que termine con elementos duplicados en la base de datos. El uso de UUID como identificador de mensaje puede ser rápido y sencillo, pero no es una clave de partición ideal para usarla en Azure Cosmos DB.

Instalación de SMT

Para poder usar SMT InsertUUID, deberá instalar esta transformación en la configuración de la plataforma de Confluent. Si usa el programa de instalación de la plataforma de Confluent de este repositorio, la transformación ya está incluida en la instalación y puede omitir este paso.

Como alternativa, puede empaquetar el origen InsertUUID para crear un archivo JAR. Para instalar el conector manualmente mediante el archivo JAR, consulte estas instrucciones.

# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Confluent Platform installation
ls target/*.jar

Configuración de SMT

Dentro de la configuración del conector receptor, agregue las siguientes propiedades para establecer id.

"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"

Para obtener más información sobre el uso de esta SMT, vea el repositorio InsertUUID.

Uso de SMT para configurar el período de vida (TTL)

Con las SMT InsertField y Cast, puede configurar el TTL en cada elemento creado en Azure Cosmos DB. Habilite el TTL en el contenedor antes de habilitar el TTL en un nivel de elemento. Para más información, vea la documentación sobre el Período de vida.

Dentro de la configuración del conector receptor, agregue las siguientes propiedades para establecer el TTL en segundos. En este ejemplo, el TTL se establece en 100 segundos. Si el mensaje ya contiene el campo TTL, estas SMT sobrescribirán el valor TTL.

"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"

Para más información sobre el uso de estas SMT, vea la documentación sobre InsertField y Cast.

Solución de problemas habituales

Estas son las soluciones a algunos problemas comunes que pueden surgir al trabajar con el conector receptor de Kafka.

Lectura de datos no JSON con JsonConverter

Si tiene datos que no son JSON en el tema de origen en Kafka e intenta leerlos mediante JsonConverter, verá la excepción siguiente:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7

Este error probablemente se deba a que los datos del tema de origen están serializados en Avro u otro formato, como la cadena CSV.

Solución: si los datos del tema están en formato AVRO, cambie el conector receptor de Kafka Connect para que use AvroConverter como se muestra a continuación.

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

Compatibilidad con el modo de puerta de enlace

connect.cosmos.connection.gateway.enabled es una opción de configuración para el conector receptor de Kafka en Cosmos DB que mejora la ingesta de datos mediante el servicio de puerta de enlace de Cosmos DB. Este servicio actúa como front-end para Cosmos DB, lo que ofrece ventajas como el equilibrio de carga, el enrutamiento de solicitudes y la traducción de protocolos. Al aprovechar el servicio de puerta de enlace, el conector logra un rendimiento y escalabilidad mejorados al escribir datos en Cosmos DB. Para más información, consulte los modos de conectividad.

"connect.cosmos.connection.gateway.enabled": true

Compatibilidad con el modo masivo

La propiedad connect.cosmos.sink.bulk.enabled determina si la característica de escritura masiva está habilitada para escribir datos de temas de Kafka en Azure Cosmos DB.

Cuando esta propiedad se establece en true (de forma predeterminada), habilita el modo de escritura masiva, lo que permite a Kafka Connect usar la API de importación masiva de Azure Cosmos DB para realizar escrituras por lotes eficaces mediante el método CosmosContainer.executeBulkOperations(). El modo de escritura masiva mejora significativamente el rendimiento de escritura y reduce la latencia general al ingerir datos en Cosmos DB en comparación con el modo no masivo cuando se usa el método CosmosContainer.upsertItem().

El modo masivo está habilitado de manera predeterminada. Para deshabilitar la propiedad connect.cosmos.sink.bulk.enabled, debe establecerla en false en la configuración del conector receptor de Cosmos DB. A continuación, se muestra un ejemplo de archivo de configuración de propiedad:

"name": "my-cosmosdb-connector",
"connector.class": "io.confluent.connect.azure.cosmosdb.CosmosDBSinkConnector",
"tasks.max": 1,
"topics": "my-topic"
"connect.cosmos.endpoint": "https://<cosmosdb-account>.documents.azure.com:443/"
"connect.cosmos.master.key": "<cosmosdb-master-key>"
"connect.cosmos.database": "my-database"
"connect.cosmos.collection": "my-collection"
"connect.cosmos.sink.bulk.enabled": false

Al habilitar la propiedad connect.cosmos.sink.bulk.enabled, puede aprovechar la funcionalidad de escritura masiva de Kafka Connect para Azure Cosmos DB a fin de lograr un rendimiento de escritura mejorado al replicar datos de temas de Kafka en Azure Cosmos DB.

"connect.cosmos.sink.bulk.enabled": true

Lectura de datos que no son Avro con AvroConverter

Este escenario es aplicable al intentar usar el convertidor de Avro para leer datos de un tema que no está en formato Avro. Incluye datos escritos por un serializador Avro distinto del serializador Avro del registro de esquema de Confluent, que tiene su propio formato de conexión.

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
...
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Solución: compruebe el formato de serialización del tema de origen. A continuación, cambie el conector receptor de Kafka Connect para que use el convertidor apropiado o cambie el formato ascendente a Avro.

Lectura de un mensaje JSON sin la estructura de esquema/carga útil esperada

Kafka Connect admite una estructura especial de mensajes JSON que contienen la carga útil y el esquema como se muestra a continuación.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ]
  },
  "payload": {
    "userid": 123,
    "name": "Sam"
  }
}

Si intenta leer datos JSON que no contienen los datos de esta estructura, recibirá el siguiente error:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Para que sea claro, la única estructura JSON válida para schemas.enable=true tiene campos de esquema y carga útil como elementos de nivel superior, como se muestra anteriormente. Como indica el mensaje de error, si solo tiene datos JSON sin formato, debe cambiar la configuración del conector por lo siguiente:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

Limitaciones

  • No se admite la creación automática de bases de datos y contenedores en Azure Cosmos DB. La base de datos y los contenedores ya deben existir y deben configurarse correctamente.

Pasos siguientes

Puede obtener más información sobre la fuente de cambios de Azure Cosmo DB en los siguientes documentos:

Puede obtener más información sobre las operaciones masivas en el SDK de Java V4 con los siguientes documentos: