Kafka to CosmosDB NOSQL API, Kafka Connect exception - "Caused by: java.lang.IllegalArgumentException: Argument 'recordValue' is not valid map format."

Madhab Chandra Pal 21 Reputation points
2022-11-01T15:23:53.84+00:00

Hello everyone,

I am trying to create a Kafka Connect sink connector using "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector" to sync Kafka topic data to Azure CosmosDB NOSQL container. Reference: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/kafka-connector-sink

Following is one of the data in Kafka topic AVRO serialized:
{
"id": "id-1667303121786",
"name": "name-1667303121786",
"age": "age-1667303121786"
}

and following is the schema:
{
"subject":"TOPIC-value",
"version":1,
"id":177692,
"schema":"{\"type\":\"record\",\"name\":\"TOPIC\",\"namespace\":\"com.company.kafka\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}

Following is the connector configuration:

{

"name": "cosmos-sinkconnector-TOPIC",  
"config": {  
    "name": "cosmos-sinkconnector-TOPIC",  
    "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",  

    "topics": "TOPIC",  
    "producer.override.client.id": "producer-client-v1",  
    "consumer.override.group.id": "consumer-group-v1",  

    "connect.cosmos.connection.endpoint": "https://account.documents.azure.com:443/",  
    "tasks.max": "1",  
    "batch.size": "1",  
    "connect.cosmos.containers.topicmap": "TOPIC#TOPIC",  
    "cosmos.id.strategy": "FullKeyStrategy",  
    "connect.cosmos.master.key": "master key hash",  

    "key.converter.schemas.enable": "false",  
    "value.converter.schema.registry.url": "https://schemaregistry",  
    "value.converter.schemas.enable": "false",  
    "connect.cosmos.databasename": "cosmossql-db",  
    "value.converter": "io.confluent.connect.avro.AvroConverter",  
    "key.converter": "io.confluent.connect.avro.AvroConverter",  
    "key.converter.schema.registry.url": "https://schemaregistry"  
}  

}

The connector worker breaks with following exception:

2022-11-01 15:10:49,194 - ERROR [task-thread-cosmos-sinkconnector-TOPIC-0:WorkerTask@191] - WorkerSinkTask{id=cosmos-sinkconnector-TOPIC-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: Argument 'recordValue' is not valid map format.
at com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument(Preconditions.java:136)
at com.azure.cosmos.kafka.connect.sink.BulkWriter.getPartitionKeyValue(BulkWriter.java:103)
at com.azure.cosmos.kafka.connect.sink.BulkWriter.writeCore(BulkWriter.java:62)
at com.azure.cosmos.kafka.connect.sink.SinkWriterBase.write(SinkWriterBase.java:26)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:123)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more

And when I used the "org.apache.kafka.connect.file.FileStreamSinkConnector" to dump the topics data to a file to visualize the topic data, the content of the file looks as below:
Struct{id=id-1667303121786,name=name-1667303121786,age=age-1667303121786}
Struct{id=id-1666958886850,name=name-1666958886850,age=age-1666958886850}
Struct{id=id-1666958891892,name=name-1666958891892,age=age-1666958891892}
Struct{id=id-1666958896935,name=name-1666958896935,age=age-1666958896935}
Struct{id=id-1666958901970,name=name-1666958901970,age=age-1666958901970}
Struct{id=id-1666958907001,name=name-1666958907001,age=age-1666958907001}
Struct{id=id-1666958912040,name=name-1666958912040,age=age-1666958912040}
Struct{id=id-1666958917109,name=name-1666958917109,age=age-1666958917109}

I searched a lot in the documentation to find the solution of the exception "Argument 'recordValue' is not valid map format." and write data to CosmosSB NOSQL container, but could not find a working solution.
In the other hand, I check the CosmosDB sink connector document about supported data types and found that STRUCT is also supported type but still the connector is throwing the exception. Reference: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/kafka-connector-sink#supported-data-types

Could anyone please help me to figure out the mistake I am doing or any SMT I need to apply to convert the STRUCT to MAP ?

Appreciations in advance for your help.

Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,543 questions
0 comments No comments
{count} votes

Accepted answer
  1. Oury Ba-MSFT 18,021 Reputation points Microsoft Employee
    2022-11-01T22:10:32.63+00:00

    Hi @Madhab Chandra Pal Thank you for posting your question on Microsoft Q&A and for using Azure Services. I am sorry to hear that you are having issues creating a kafta sink connector using Azure Cosmos DB.

    There are currently some issues when customers are using Arvo serializer, if they use JsonSerializer, then they will not see the issue. Our product group are currently working on a fix. (I will give you more details on the fix on this thread)
    Please try the below workarounds and let me know if that is helpful.

    • Please check whether you can use JsonSerializer - you will not see the exception
    • Also try version 1.5.0 and see whether you will still see the issue

    There is a git issue currently open https://github.com/microsoft/kafka-connect-cosmosdb/issues/493

    Regards,
    Oury

    0 comments No comments

1 additional answer

Sort by: Most helpful
  1. Madhab Chandra Pal 21 Reputation points
    2022-11-04T13:23:53.257+00:00

    Hi @Oury Ba-MSFT ,

    Thank you very much for your help and suggestions.

    Yes, both of your workarounds work.

    I was using latest release 1.0.6 of CosmosDBSinkConnector plugin from here { v1.0.6 } and resulted into the issue. When I used 1.0.5 I was able to insert AVRO serialized Kafka topic data to CosmosDB NOSQL

    Here are my observations:

    1. JsonSerializer works both with CosmosDBSinkConnector versions 1.0.5 and 1.0.6 (I did not go further down in versions)
    2. AvroSerializer works with CosmosDBSinkConnector version 1.0.5
    3. CosmosDBSinkConnector version 1.6.0 has the open issue with AvroSerializer that breaks to "Caused by: java.lang.IllegalArgumentException: Argument 'recordValue' is not valid map format."
    4. In 1.0.6, neither "id.strategy" nor "cosmos.id.strategy" is taken by the connector and always uses "com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy", even though I passed "com.azure.cosmos.kafka.connect.sink.id.strategy.FullKeyStrategy". Whereas, 1.0.5 takes value passed in "id.strategy" in the connector configuration.

    Anyways, we are good with 1.0.5 as of now and it solves our problem. Do inform when stable higher releases of connector available.

    Thank you.