Can you provide me an example how to build a durable function with a blob trigger using the Python v2 programming model?

Julian Leopold 25 Reputation points
2023-09-29T09:55:38.0333333+00:00

I would like to set up a data processing pipeline which process new files that are added to a blob storage.

I need to pass the filename from the blob trigger to an activity function, which will load the file and store it in a database. The further processing functions will have bindings to a PostgreSQL database and load the data from there.

❓ Can you help me by providing an example how I can build this blob triggered durable function in the Python v2 programming model?

Thanks 🙏

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,679 questions
Azure Blob Storage
Azure Blob Storage
An Azure service that stores unstructured data in the cloud as blobs.
2,639 questions
0 comments No comments
{count} votes

Accepted answer
  1. MuthuKumaranMurugaachari-MSFT 22,276 Reputation points
    2023-10-02T15:19:42.8433333+00:00

    Julian Leopold Thanks for posting your question in Microsoft Q&A. Check out the below sample code I have created as a reference based on the simple durable function code doc as well as similar discussion in Q&A for C#.

    import azure.functions as func
    import azure.durable_functions as df
    import logging
    
    myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    
    @myApp.blob_trigger(arg_name="myblob", path="durabletest",
                                   connection="BlobStorageConnectionString") 
    @myApp.durable_client_input(client_name="client")
    async def blob_trigger(myblob: func.InputStream, client):
        logging.info(f"Python blob trigger function processed blob"
                    f"Name: {myblob.name}")
        instance_id = await client.start_new("hello_orchestrator", client_input=myblob.name)
        
    # Orchestrator
    @myApp.orchestration_trigger(context_name="context")
    def hello_orchestrator(context):
        blobName: str = context.get_input()
        result1 = yield context.call_activity("storeBlob", blobName)
        result2 = yield context.call_activity("processBlob", blobName)
    
        return [result1, result2]
    
    # Activity - 1
    @myApp.activity_trigger(input_name="blobName")
    def storeBlob(blobName: str):
        logging.info("storeBlob: Hello " + blobName)
        return "storeBlob: Hello " + blobName   
    
    # Activity - 2
    @myApp.activity_trigger(input_name="blobName")
    def processBlob(blobName: str):
        logging.info("processBlob: Hello " + blobName)
        return "processBlob: Hello " + blobName 
    

    Note: BlobStorageConnectionString value must be set in the application setting or values in local.settings.json.

    You can build the actions (write a custom code) needed in the activity functions for your scenario and handle the responses. Also, refer Orchestrator function code constraints and Bindings for Durable Functions to know more about orchestration trigger and constraints.

    I hope this helps and let us know if any questions.


    If you found the answer to your question helpful, please take a moment to mark it as "Yes" for others to benefit from your experience. Or simply add a comment tagging me and would be happy to answer your questions.


0 additional answers

Sort by: Most helpful