Durable Functions Python: Running Singleton Sub-orchestrations from a parent orchestration

Giles Antonio Radford 0 Reputation points
2023-09-01T08:50:07.9166667+00:00

I have a number of orchestrations that should not run concurrently. Specifically, I am transferring ledger data incrementally from one system to another. I am using the singleton pattern to trigger them in their trigger function:

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    config: LedgerDownloadOrchestratorConfig = {
        "company": req.route_params["company"],
        "ledger": req.route_params["ledger"],
    }
    instance_id = f"{config['company']}_{config['ledger']}"

    existing_instance = await client.get_status(instance_id)

    if existing_instance.runtime_status in [
        df.OrchestrationRuntimeStatus.Completed,
        df.OrchestrationRuntimeStatus.Failed,
        df.OrchestrationRuntimeStatus.Terminated,
        None,
    ]:
        instance_id = await client.start_new(
            "ledger_download_orchestrator", instance_id, config
        )
        logging.info(f"Started orchestration with ID = '{instance_id}'.")

    return client.create_check_status_response(req, instance_id)

However, I want to now create one orchestrator that, when triggered, will run the sub-orchestrators and then wait for them all to complete. I've been looking at the fan-out fan-in pattern, and I have tried this:

def orchestrator_function(context: df.DurableOrchestrationContext):
    logging.info(f"Starting meta-orchestrator for all companies")
    download_tasks = []
    for company in COMPANIES:
        for ledger in LEDGERS:
            config: LedgerDownloadOrchestratorConfig = {
                "company": company,
                "ledger": ledger,
            }
            instance_id = f"{config['company']}_{config['ledger']}"
            download_task = yield context.call_sub_orchestrator(
                name="ledger_download_orchestrator",
                input_=config,
                instance_id=instance_id,
            )
            download_tasks.append(download_task)
    yield context.task_all(download_tasks)
    logging.info(f"Finished meta-orchestrator for all companies")

This is the closest I've found to fanning out the singleton pattern for orchestrators... However, when I run this orchestrator, I get the following status response:

{
  "name": "ledger_downloader_all_orchestrator",
  "instanceId": "ledger_downloader_all_companies",
  "runtimeStatus": "Failed",
  "input": null,
  "customStatus": null,
  "output": "Non-Deterministic workflow detected: A previous execution of this orchestration scheduled a sub-orchestration task with sequence ID 0 and name 'ledger_download_orchestrator' (version '', instance ID 'AN_Movs_contabilidad'), but the current replay execution hasn't (yet?) scheduled this task. Was a change made to the orchestrator code after this instance had already started running?",
  "createdTime": "2023-09-01T08:31:52Z",
  "lastUpdatedTime": "2023-09-01T08:31:59Z"
}

How do I solve this issue?

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,909 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. MikeUrnun 9,777 Reputation points Moderator
    2023-09-19T03:30:09.21+00:00

    Hello, @Giles Antonio Radford - Sorry for the late reply. One of the important aspects of using the orchestration functions is that they need to be deterministic in order for the underlying replay pattern to work properly. The replay pattern means the 1 complete execution of your orchestration function is logical but it actually runs many times behind the scenes and relies on each replay to produce the same response as it did in its previous replay so that it can build an immutable history of events.

    In your code above, it looks like you have 2 calls for yield keyword: one in foreach() loop as yield context.call_sub_orchestrator() and the other that's outside of the loop as yield context.task_all(); you'll want to remove the one in the loop. Overall though, the workflow that you're attempting to accomplish seems very similar to the E2_CopyFileToBlob function sample: https://github.com/Azure/azure-functions-durable-python/blob/dev/samples-v2/fan_in_fan_out/function_app.py

    Why not fan out activity functions (tasked with async i/o heavy operations) instead of sub orchestrations?

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.