Julian Leopold Thanks for posting your question in Microsoft Q&A. Check out the below sample code I have created as a reference based on the simple durable function code doc as well as similar discussion in Q&A for C#.
import azure.functions as func
import azure.durable_functions as df
import logging
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.blob_trigger(arg_name="myblob", path="durabletest",
connection="BlobStorageConnectionString")
@myApp.durable_client_input(client_name="client")
async def blob_trigger(myblob: func.InputStream, client):
logging.info(f"Python blob trigger function processed blob"
f"Name: {myblob.name}")
instance_id = await client.start_new("hello_orchestrator", client_input=myblob.name)
# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
blobName: str = context.get_input()
result1 = yield context.call_activity("storeBlob", blobName)
result2 = yield context.call_activity("processBlob", blobName)
return [result1, result2]
# Activity - 1
@myApp.activity_trigger(input_name="blobName")
def storeBlob(blobName: str):
logging.info("storeBlob: Hello " + blobName)
return "storeBlob: Hello " + blobName
# Activity - 2
@myApp.activity_trigger(input_name="blobName")
def processBlob(blobName: str):
logging.info("processBlob: Hello " + blobName)
return "processBlob: Hello " + blobName
Note: BlobStorageConnectionString
value must be set in the application setting or values in local.settings.json
.
You can build the actions (write a custom code) needed in the activity functions for your scenario and handle the responses. Also, refer Orchestrator function code constraints and Bindings for Durable Functions to know more about orchestration trigger and constraints.
I hope this helps and let us know if any questions.
If you found the answer to your question helpful, please take a moment to mark it as "Yes" for others to benefit from your experience. Or simply add a comment tagging me and would be happy to answer your questions.