question

anuthereaper avatar image
0 Votes"
anuthereaper asked MartinJaffer-MSFT commented

Streaming Databricks job failing while writing to ADLS after reading from Eventhub

Hi,

We have an Event hub which is used to ingest data from an external sending application. We are reading data from the Eventhub using Azure databricks using Pyspark. We are attempting to read the data from 1 consumer group and write it to different folders on ADLS depending on which partition it came in on. The message on each of the partitions will have it's own schema which we have defined in the code. This setup is working fine for a few minutes and then the streaming job fails.

We tried having the processing of only 1 partition and this was running fine which tells me that there is something not right with the way we are using 1 streaming dataset and writing to 4 sinks.

Can someone help look at this and give us some suggestions of what we can try?

The extract of our code is below

 from pyspark.sql import SparkSession
 from pyspark.sql.types import *
 from pyspark.sql.functions import *
 import pyspark.sql.functions as f
 from datetime import datetime
 from pyspark.sql.window import Window
     
 connectionString = "Endpoint=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
 ehConf = {}
 ehConf['eventhubs.connectionString'] = connectionString
 ehConf['eventhubs.consumerGroup'] = "$Default"
 ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
    
 df = spark.readStream.format("eventhubs").options(**ehConf).load()
    
 decoded_df_Optin =df.select(F.from_json(F.col("body").cast("string"),events_schema_Optin).alias("Payload_Optin"), F.col("partition").alias("Partition"))
 decoded_df_Optin= decoded_df_Optin.filter(decoded_df_Optin.Partition=="0")
 decoded_df_Update =df.select(F.from_json(F.col("body").cast("string"),events_schema_Update).alias("Payload_Update"), F.col("partition").alias("Partition"))
 decoded_df_Update= decoded_df_Update.filter(decoded_df_Update.Partition=="1")
 decoded_df_Transaction =df.select(F.from_json(F.col("body").cast("string"),events_schema_Transaction).alias("Payload_Transaction"), F.col("partition").alias("Partition"))
 decoded_df_Transaction= decoded_df_Transaction.filter(decoded_df_Transaction.Partition=="2")
    
 df_out_optin = df_events_optin_column_renamed.coalesce(1)\
  .writeStream\
   .trigger(processingTime='1 minute') \
   .outputMode("append")\
   .format("json")\
   .option("path", Optin_TempPath)\
   .option("checkpointLocation", Optin_CheckpointPath)\
   .start()
    
 df_out_update = df_events_update_column_renamed.coalesce(1)\
   .writeStream\
    .trigger(processingTime='1 minute') \
    .outputMode("append")\
    .format("json")\
    .option("path", Update_TempPath)\
    .option("checkpointLocation", Update_CheckpointPath)\
    .start()
    
  df_out_transaction = df_events_Transaction_column_renamed.coalesce(1)\
   .writeStream\
    .trigger(processingTime='1 minute') \
    .outputMode("append")\
    .format("json")\
   .option("path", Transaction_TempPath)\
    .option("checkpointLocation", Transaction_CheckpointPath)\
    .start()

The error log looks like this.

     at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:289)
     at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:198)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:606)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
     at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:604)
     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:604)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240)
     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209)
     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203)
     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366)
     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 9) (10.139.64.4 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
     at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:368)
     at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266)
     at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
     at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
     at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.scheduler.Task.run(Task.scala:91)
     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
 Caused by: java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
     at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
     at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
     at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
     at com.microsoft.azure.eventhubs.impl.ExceptionUtil.completeExceptionally(ExceptionUtil.java:116)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.drainPendingReceives(MessageReceiver.java:505)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.onError(MessageReceiver.java:490)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:790)
     at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.processOnClose(BaseLinkHandler.java:73)
     at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.handleRemoteLinkClosed(BaseLinkHandler.java:109)
     at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:51)
     at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
     at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
     at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
     at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
     at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
     ... 3 more
 Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
     at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:43)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:789)
     ... 15 more
    
 Driver stacktrace:
     at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255)
     at scala.Option.foreach(Option.scala:407)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
     at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2429)
     at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:256)
     ... 28 more
 Caused by: org.apache.spark.SparkException: Task failed while writing rows.
     at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:368)
     at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:266)
     at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
     at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
     at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.scheduler.Task.run(Task.scala:91)
     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
     at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
 Caused by: java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
     at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
     at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
     at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
     at com.microsoft.azure.eventhubs.impl.ExceptionUtil.completeExceptionally(ExceptionUtil.java:116)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.drainPendingReceives(MessageReceiver.java:505)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.onError(MessageReceiver.java:490)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:790)
     at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.processOnClose(BaseLinkHandler.java:73)
     at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.handleRemoteLinkClosed(BaseLinkHandler.java:109)
     at com.microsoft.azure.eventhubs.impl.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:51)
     at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
     at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
     at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
     at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
     at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
     ... 3 more
 Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. TrackingId:6b20ec1f0000167e00003d60616e3fe8_G11S2_B0S3, SystemTracker:anueventhubns:eventhub:anueventhub~1023|$default, Timestamp:2021-10-19T03:48:02, errorContext[NS: anueventhubns.servicebus.windows.net, PATH: anueventhub/ConsumerGroups/$Default/Partitions/0, REFERENCE_ID: LN_2374de_1634615267453_a_G11S2, PREFETCH_COUNT: 500, LINK_CREDIT: 500, PREFETCH_Q_LEN: 0]
     at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:43)
     at com.microsoft.azure.eventhubs.impl.MessageReceiver.onClose(MessageReceiver.java:789)
     ... 15 more




azure-databricksazure-event-hubs
· 1
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Did my response help you @anuthereaper ? If it solved your issue please mark as accepted answer. Otherwise let me know how I may better assist.

0 Votes 0 ·
anuthereaper avatar image
1 Vote"
anuthereaper answered MartinJaffer-MSFT commented

I found the answer in this LINK.
The issue is with having multiple output streaming sinks with 1 input datastream. Writing into each sink causes the streaming dataframe to be recomputed.

The solution says to use the foreachbatch or foreach. When I used the foreachbatch, my pyspark code started working as expected even with high load.
The spark doumentation is HERE and a Pyspark example is HERE which is exactly what I used.

Hope this is helpful to anyone else who faces the same problem.


· 1
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Thank you for sharing the correct solution where I was wrong.

0 Votes 0 ·
MartinJaffer-MSFT avatar image
0 Votes"
MartinJaffer-MSFT answered

Hello @anuthereaper and welcome to Microsoft Q&A.

From the error message, I concur with your assessment, but have not yet found anything obvious in the code you shared. I expect the problem to be near where the imported SparkSession is used. (I don't see SparkSession unsed in code?)

Caused by: com.microsoft.azure.eventhubs.ReceiverDisconnectedException: New receiver 'spark-driver-11' with higher epoch of '0' is created hence current receiver 'spark-driver-9' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.

The error message sounds like "epoch" is used to determine seniority of the receiver sessions, and should be unique.

5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.