Hello everyone,
I am trying to create a Kafka Connect sink connector using "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector" to sync Kafka topic data to Azure CosmosDB NOSQL container. Reference: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/kafka-connector-sink
Following is one of the data in Kafka topic AVRO serialized:
{
"id": "id-1667303121786",
"name": "name-1667303121786",
"age": "age-1667303121786"
}
and following is the schema:
{
"subject":"TOPIC-value",
"version":1,
"id":177692,
"schema":"{\"type\":\"record\",\"name\":\"TOPIC\",\"namespace\":\"com.company.kafka\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}
Following is the connector configuration:
{
"name": "cosmos-sinkconnector-TOPIC",
"config": {
"name": "cosmos-sinkconnector-TOPIC",
"connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"topics": "TOPIC",
"producer.override.client.id": "producer-client-v1",
"consumer.override.group.id": "consumer-group-v1",
"connect.cosmos.connection.endpoint": "https://account.documents.azure.com:443/",
"tasks.max": "1",
"batch.size": "1",
"connect.cosmos.containers.topicmap": "TOPIC#TOPIC",
"cosmos.id.strategy": "FullKeyStrategy",
"connect.cosmos.master.key": "master key hash",
"key.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "https://schemaregistry",
"value.converter.schemas.enable": "false",
"connect.cosmos.databasename": "cosmossql-db",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://schemaregistry"
}
}
The connector worker breaks with following exception:
2022-11-01 15:10:49,194 - ERROR [task-thread-cosmos-sinkconnector-TOPIC-0:WorkerTask@191] - WorkerSinkTask{id=cosmos-sinkconnector-TOPIC-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: Argument 'recordValue' is not valid map format.
at com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument(Preconditions.java:136)
at com.azure.cosmos.kafka.connect.sink.BulkWriter.getPartitionKeyValue(BulkWriter.java:103)
at com.azure.cosmos.kafka.connect.sink.BulkWriter.writeCore(BulkWriter.java:62)
at com.azure.cosmos.kafka.connect.sink.SinkWriterBase.write(SinkWriterBase.java:26)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:123)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
And when I used the "org.apache.kafka.connect.file.FileStreamSinkConnector" to dump the topics data to a file to visualize the topic data, the content of the file looks as below:
Struct{id=id-1667303121786,name=name-1667303121786,age=age-1667303121786}
Struct{id=id-1666958886850,name=name-1666958886850,age=age-1666958886850}
Struct{id=id-1666958891892,name=name-1666958891892,age=age-1666958891892}
Struct{id=id-1666958896935,name=name-1666958896935,age=age-1666958896935}
Struct{id=id-1666958901970,name=name-1666958901970,age=age-1666958901970}
Struct{id=id-1666958907001,name=name-1666958907001,age=age-1666958907001}
Struct{id=id-1666958912040,name=name-1666958912040,age=age-1666958912040}
Struct{id=id-1666958917109,name=name-1666958917109,age=age-1666958917109}
I searched a lot in the documentation to find the solution of the exception "Argument 'recordValue' is not valid map format." and write data to CosmosSB NOSQL container, but could not find a working solution.
In the other hand, I check the CosmosDB sink connector document about supported data types and found that STRUCT is also supported type but still the connector is throwing the exception. Reference: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/kafka-connector-sink#supported-data-types
Could anyone please help me to figure out the mistake I am doing or any SMT I need to apply to convert the STRUCT to MAP ?
Appreciations in advance for your help.