Hi,
We have an Event hub which is used to ingest data from an external sending application. We are reading data from the Eventhub using Azure databricks using Pyspark. We are attempting to read the data from 1 consumer group and write it to different folders on ADLS depending on which partition it came in on. The message on each of the partitions will have it's own schema which we have defined in the code. This setup is working fine for a few minutes and then the streaming job fails.
We tried having the processing of only 1 partition and this was running fine which tells me that there is something not right with the way we are using 1 streaming dataset and writing to 4 sinks.
Can someone help look at this and give us some suggestions of what we can try?
The extract of our code is below
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from datetime import datetime
from pyspark.sql.window import Window
connectionString = "Endpoint=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString
ehConf['eventhubs.consumerGroup'] = "$Default"
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
df = spark.readStream.format("eventhubs").options(**ehConf).load()
decoded_df_Optin =df.select(F.from_json(F.col("body").cast("string"),events_schema_Optin).alias("Payload_Optin"), F.col("partition").alias("Partition"))
decoded_df_Optin= decoded_df_Optin.filter(decoded_df_Optin.Partition=="0")
decoded_df_Update =df.select(F.from_json(F.col("body").cast("string"),events_schema_Update).alias("Payload_Update"), F.col("partition").alias("Partition"))
decoded_df_Update= decoded_df_Update.filter(decoded_df_Update.Partition=="1")
decoded_df_Transaction =df.select(F.from_json(F.col("body").cast("string"),events_schema_Transaction).alias("Payload_Transaction"), F.col("partition").alias("Partition"))
decoded_df_Transaction= decoded_df_Transaction.filter(decoded_df_Transaction.Partition=="2")
df_out_optin = df_events_optin_column_renamed.coalesce(1)\
.writeStream\
.trigger(processingTime='1 minute') \
.outputMode("append")\
.format("json")\
.option("path", Optin_TempPath)\
.option("checkpointLocation", Optin_CheckpointPath)\
.start()
df_out_update = df_events_update_column_renamed.coalesce(1)\
.writeStream\
.trigger(processingTime='1 minute') \
.outputMode("append")\
.format("json")\
.option("path", Update_TempPath)\
.option("checkpointLocation", Update_CheckpointPath)\
.start()
df_out_transaction = df_events_Transaction_column_renamed.coalesce(1)\
.writeStream\
.trigger(processingTime='1 minute') \
.outputMode("append")\
.format("json")\
.option("path", Transaction_TempPath)\
.option("checkpointLocation", Transaction_CheckpointPath)\
.start()
The error log looks like this.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:289)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:606)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:604)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:604)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 9) (10.139.64.4 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:368)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.completeExceptionally(ExceptionUtil.java:116)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.drainPendingReceives(MessageReceiver.java:505)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onError(MessageReceiver.java:490)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:790)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.processOnClose(BaseLinkHandler.java:73)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.handleRemoteLinkClosed(BaseLinkHandler.java:109)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:51)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
... 3 more
Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:43)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:789)
... 15 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2429)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:256)
... 28 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:368)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.completeExceptionally(ExceptionUtil.java:116)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.drainPendingReceives(MessageReceiver.java:505)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onError(MessageReceiver.java:490)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:790)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.processOnClose(BaseLinkHandler.java:73)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.handleRemoteLinkClosed(BaseLinkHandler.java:109)
at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:51)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
... 3 more
Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:43)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:789)
... 15 more