Writing CosmosDb Data On Kafka Topic | Databricks

Tomar, Abhishek 6 Reputation points
2022-09-27T10:44:12.613+00:00

I have created a connection with cosmos DB using cosmos spark oltp 3 connector and now I am trying to stream my datagram on Kafka topic, please help.

CosmosDB OLTP SPARK 3 Connection

stremDataFram= (spark.readStream  
.format("cosmos.oltp.changeFeed")  
.option("spark.cosmos.container", PARM_CONTAINER_NAME)  
.option("spark.cosmos.read.inferSchema.enabled", "true")  
.option("spark.cosmos.changeFeed.startFrom", "Beginning")  
.option("spark.cosmos.changeFeed.mode", "Incremental")  
.option("path", "/mnt/ticstorageaccoun")  
.options(**cfg)  
.load())  
stremDataFram.createOrReplaceGlobalTempView('StreamTable')  

Writing Datafram on Stream

rawData = (dfStream  
            .selectExpr("CAST(partition_key AS STRING)", "CAST(value AS STRING)")  
            .writeStream.format("kafka")  
            .option("kafka.bootstrap.servers", bootstrap_server)  
            .option("topic", topic_name)  
            .option("subscribe", "hasif_test")  
            .option("spark.executor.extraJavaOptions", "-Djava.security.auth.login.config=jaas.conf")\  
            .option("kafka.security.protocol", "SASL_SSL")  
            .option("ssl.truststore.location", truststore_loc)  
            .option("ssl.truststore.password",truststore_pwd)  
            .option("ssl.keystore.location", keystore_loc)  
            .option("ssl.keystore.password", keystore_pwd)  
            .option("checkpointLocation", "/mnt/ticstorageaccoun/check")  
            .start())  

ERROR
org.apache.spark.SparkException: Writing job aborted

at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:717)  
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:403)  
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:347)  
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:295)  
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:307)  
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)  
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)  
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)  
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:68)  
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:87)  
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)  
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)  
	at org.apache.spark.sql.execution.ResultCacheManager.collectResult$1(ResultCacheManager.scala:575)  
	at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:582)  
	at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:528)  
	at scala.Option.getOrElse(Option.scala:189)  
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:527)  
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:424)  
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:403)  
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:422)  
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3153)  
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3959)  
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3120)  
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3951)  
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)  
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)  
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)  
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)  
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)  
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)  
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)  
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:3120)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:718)  
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)  
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)  
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)  
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)  
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)  
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:713)  
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:301)  
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:299)  
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:713)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$5(MicroBatchExecution.scala:283)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:809)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:280)  
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:301)  
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:299)  
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:239)  
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)  
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:233)  
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:359)  
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)  
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:323)  
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:250)  
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 81 in stage 13.0 failed 4 times, most recent failure: Lost task 81.3 in stage 13.0 (TID 388) (10.139.64.6 executor 1): kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka producer  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)  
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)  
	at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)  
	at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)  
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:436)  
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1715)  
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:474)  
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:375)  
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)  
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)  
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.scheduler.Task.run(Task.scala:95)  
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)  
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)  
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)  
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)  
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  
	at java.lang.Thread.run(Thread.java:748)  
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set  
	at kafkashaded.org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)  
	at kafkashaded.org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)  
	at kafkashaded.org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:82)  
	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)  
	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)  
	at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)  
	... 31 more  
  
Driver stacktrace:  
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)  
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)  
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)  
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)  
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)  
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)  
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)  
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)  
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)  
	at scala.Option.foreach(Option.scala:407)  
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)  
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)  
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)  
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)  
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)  
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)  
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)  
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)  
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)  
	... 57 more  
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka producer  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)  
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)  
	at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)  
	at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)  
	at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)  
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:436)  
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1715)  
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:474)  
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:375)  
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)  
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)  
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.scheduler.Task.run(Task.scala:95)  
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)  
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)  
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)  
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)  
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  
	at java.lang.Thread.run(Thread.java:748)  
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set  
	at kafkashaded.org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)  
	at kafkashaded.org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)  
	at kafkashaded.org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:82)  
	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)  
	at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)  
	at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)  
	at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)  
	... 31 more  

  
  
  
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,915 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,439 questions
{count} votes