Streaming Databricks job failing while writing to ADLS after reading from Eventhub

Chand, Anupam SBOBNG-ITA/RX 451 Reputation points
2021-10-19T12:58:50.683+00:00

Hi,

We have an Event hub which is used to ingest data from an external sending application. We are reading data from the Eventhub using Azure databricks using Pyspark. We are attempting to read the data from 1 consumer group and write it to different folders on ADLS depending on which partition it came in on. The message on each of the partitions will have it's own schema which we have defined in the code. This setup is working fine for a few minutes and then the streaming job fails.

We tried having the processing of only 1 partition and this was running fine which tells me that there is something not right with the way we are using 1 streaming dataset and writing to 4 sinks.

Can someone help look at this and give us some suggestions of what we can try?

The extract of our code is below

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from datetime import datetime
from pyspark.sql.window import Window

connectionString = "Endpoint=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString
ehConf['eventhubs.consumerGroup'] = "$Default"
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

df = spark.readStream.format("eventhubs").options(**ehConf).load()

decoded_df_Optin =df.select(F.from_json(F.col("body").cast("string"),events_schema_Optin).alias("Payload_Optin"), F.col("partition").alias("Partition"))
decoded_df_Optin= decoded_df_Optin.filter(decoded_df_Optin.Partition=="0")
decoded_df_Update =df.select(F.from_json(F.col("body").cast("string"),events_schema_Update).alias("Payload_Update"), F.col("partition").alias("Partition"))
decoded_df_Update= decoded_df_Update.filter(decoded_df_Update.Partition=="1")
decoded_df_Transaction =df.select(F.from_json(F.col("body").cast("string"),events_schema_Transaction).alias("Payload_Transaction"), F.col("partition").alias("Partition"))
decoded_df_Transaction= decoded_df_Transaction.filter(decoded_df_Transaction.Partition=="2")

df_out_optin = df_events_optin_column_renamed.coalesce(1)\
 .writeStream\
  .trigger(processingTime='1 minute') \
  .outputMode("append")\
  .format("json")\
  .option("path", Optin_TempPath)\
  .option("checkpointLocation", Optin_CheckpointPath)\
  .start()

df_out_update = df_events_update_column_renamed.coalesce(1)\
  .writeStream\
   .trigger(processingTime='1 minute') \
   .outputMode("append")\
   .format("json")\
   .option("path", Update_TempPath)\
   .option("checkpointLocation", Update_CheckpointPath)\
   .start()

 df_out_transaction = df_events_Transaction_column_renamed.coalesce(1)\
  .writeStream\
   .trigger(processingTime='1 minute') \
   .outputMode("append")\
   .format("json")\
  .option("path", Transaction_TempPath)\
   .option("checkpointLocation", Transaction_CheckpointPath)\
   .start()

The error log looks like this.

    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:289)
    at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:606)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:604)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:604)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 9) (10.139.64.4 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:368)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at com.microsoft.azure.eventhubs.impl.ExceptionUtil.completeExceptionally(ExceptionUtil.java:116)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.drainPendingReceives(MessageReceiver.java:505)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.onError(MessageReceiver.java:490)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:790)
    at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.processOnClose(BaseLinkHandler.java:73)
    at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.handleRemoteLinkClosed(BaseLinkHandler.java:109)
    at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:51)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    ... 3 more
Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
    at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:43)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:789)
    ... 15 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2429)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:256)
    ... 28 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:368)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at com.microsoft.azure.eventhubs.impl.ExceptionUtil.completeExceptionally(ExceptionUtil.java:116)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.drainPendingReceives(MessageReceiver.java:505)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.onError(MessageReceiver.java:490)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:790)
    at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.processOnClose(BaseLinkHandler.java:73)
    at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.handleRemoteLinkClosed(BaseLinkHandler.java:109)
    at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:51)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
    at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    ... 3 more
Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
    at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:43)
    at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:789)
    ... 15 more
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
556 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,917 questions
{count} votes

Accepted answer
  1. Chand, Anupam SBOBNG-ITA/RX 451 Reputation points
    2021-11-04T01:43:37.697+00:00

    I found the answer in this LINK.
    The issue is with having multiple output streaming sinks with 1 input datastream. Writing into each sink causes the streaming dataframe to be recomputed.

    The solution says to use the foreachbatch or foreach. When I used the foreachbatch, my pyspark code started working as expected even with high load.
    The spark doumentation is HERE and a Pyspark example is HERE which is exactly what I used.

    Hope this is helpful to anyone else who faces the same problem.

    1 person found this answer helpful.

1 additional answer

Sort by: Most helpful
  1. MartinJaffer-MSFT 26,021 Reputation points
    2021-10-20T16:13:22.367+00:00

    Hello anonymous user and welcome to Microsoft Q&A.

    From the error message, I concur with your assessment, but have not yet found anything obvious in the code you shared. I expect the problem to be near where the imported SparkSession is used. (I don't see SparkSession unsed in code?)

    Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.

    The error message sounds like "epoch" is used to determine seniority of the receiver sessions, and should be unique.

    0 comments No comments