using streaming batch for multiple operations

Rohit Sapru 41 Reputation points
2020-12-15T04:00:40.087+00:00

I am new to spark and DataBricks and was trying to look for a solution where I can utilize a batch from a eventhub stream to accomplish multiple business logic but could not find any guidance.

Stream I get from EventHub is a CDC stream from multiple tables and every time I get a new batch of events from eventHub stream, I need to run different business logic for events captured from different tables. I am utilizing foreachbatch process, i understand that I can move entire business logic into same function, but is there a better way of segregating logic. I attempted the following but looks like only the last foreachbatch gets called.

Eventdata
.writeStream
.format("memory")
.trigger(Trigger.ProcessingTime("60 seconds"))
.foreachBatch(businessprocess1 _)
.foreachBatch(businessprocess2 _)
.outputMode("append").start()

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,912 questions
0 comments No comments
{count} votes

Accepted answer
  1. PRADEEPCHEEKATLA-MSFT 76,746 Reputation points Microsoft Employee
    2020-12-15T09:46:51.25+00:00

    Hello @Rohit Sapru ,

    Welcome to the Microsfot Q&A platform.

    Yes, this is expected behavior when you call multiple foreachBatch operations in a single query.

    The foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

    ForeachBatch: foreachBatch(...) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.

    Syntax:

    def foreach_batch_function(df, epoch_id):  
        # Transform and write batchDF  
        pass  
     streamingDF.writeStream.foreachBatch(foreach_batch_function).start()   
    

    For more details, refer to the below links which explain more about the usage of the foreachbatch:

    https://albertusk95.github.io/posts/2019/11/spark-structured-streaming-multiple-queries/

    https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

    https://stackoverflow.com/questions/58766638/how-to-use-foreach-or-foreachbatch-in-pyspark-to-write-to-database

    https://docs.databricks.com/spark/latest/structured-streaming/foreach.html

    Hope this helps. Do let us know if you any further queries.

    ------------

    • Please accept an answer if correct. Original posters help the community find answers faster by identifying the correct answer. Here is how.
    • Want a reminder to come back and check responses? Here is how to subscribe to a notification.

0 additional answers

Sort by: Most helpful