Writing CosmosDb Data On Kafka Topic | Databricks
Tomar, Abhishek
6
Reputation points
I have created a connection with cosmos DB using cosmos spark oltp 3 connector and now I am trying to stream my datagram on Kafka topic, please help.
CosmosDB OLTP SPARK 3 Connection
stremDataFram= (spark.readStream
.format("cosmos.oltp.changeFeed")
.option("spark.cosmos.container", PARM_CONTAINER_NAME)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.option("spark.cosmos.changeFeed.startFrom", "Beginning")
.option("spark.cosmos.changeFeed.mode", "Incremental")
.option("path", "/mnt/ticstorageaccoun")
.options(**cfg)
.load())
stremDataFram.createOrReplaceGlobalTempView('StreamTable')
Writing Datafram on Stream
rawData = (dfStream
.selectExpr("CAST(partition_key AS STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server)
.option("topic", topic_name)
.option("subscribe", "hasif_test")
.option("spark.executor.extraJavaOptions", "-Djava.security.auth.login.config=jaas.conf")\
.option("kafka.security.protocol", "SASL_SSL")
.option("ssl.truststore.location", truststore_loc)
.option("ssl.truststore.password",truststore_pwd)
.option("ssl.keystore.location", keystore_loc)
.option("ssl.keystore.password", keystore_pwd)
.option("checkpointLocation", "/mnt/ticstorageaccoun/check")
.start())
ERROR
org.apache.spark.SparkException: Writing job aborted
at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:717)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:403)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:347)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:295)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:307)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:68)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:87)
at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
at org.apache.spark.sql.execution.ResultCacheManager.collectResult$1(ResultCacheManager.scala:575)
at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:582)
at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:528)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:527)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:424)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:403)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:422)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3153)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3959)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3120)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3951)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3120)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:718)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:713)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:301)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:299)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:713)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$5(MicroBatchExecution.scala:283)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:809)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:280)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:301)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:299)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:239)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:233)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:359)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:323)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:250)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 81 in stage 13.0 failed 4 times, most recent failure: Lost task 81.3 in stage 13.0 (TID 388) (10.139.64.6 executor 1): kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:436)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1715)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:474)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:375)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at kafkashaded.org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at kafkashaded.org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at kafkashaded.org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:82)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
... 31 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2634)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
... 57 more
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:274)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.createKafkaProducer(InternalKafkaProducerPool.scala:136)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.$anonfun$acquire$1(InternalKafkaProducerPool.scala:83)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool.acquire(InternalKafkaProducerPool.scala:82)
at org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPool$.acquire(InternalKafkaProducerPool.scala:198)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:53)
at org.apache.spark.sql.kafka010.KafkaDataWriter.write(KafkaDataWriter.scala:42)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:436)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1715)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:474)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:375)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at kafkashaded.org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at kafkashaded.org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at kafkashaded.org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:82)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)
at kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)
at kafkashaded.org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
... 31 more