I have a number of orchestrations that should not run concurrently. Specifically, I am transferring ledger data incrementally from one system to another. I am using the singleton pattern to trigger them in their trigger function:
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
config: LedgerDownloadOrchestratorConfig = {
"company": req.route_params["company"],
"ledger": req.route_params["ledger"],
}
instance_id = f"{config['company']}_{config['ledger']}"
existing_instance = await client.get_status(instance_id)
if existing_instance.runtime_status in [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Failed,
df.OrchestrationRuntimeStatus.Terminated,
None,
]:
instance_id = await client.start_new(
"ledger_download_orchestrator", instance_id, config
)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
However, I want to now create one orchestrator that, when triggered, will run the sub-orchestrators and then wait for them all to complete. I've been looking at the fan-out fan-in pattern, and I have tried this:
def orchestrator_function(context: df.DurableOrchestrationContext):
logging.info(f"Starting meta-orchestrator for all companies")
download_tasks = []
for company in COMPANIES:
for ledger in LEDGERS:
config: LedgerDownloadOrchestratorConfig = {
"company": company,
"ledger": ledger,
}
instance_id = f"{config['company']}_{config['ledger']}"
download_task = yield context.call_sub_orchestrator(
name="ledger_download_orchestrator",
input_=config,
instance_id=instance_id,
)
download_tasks.append(download_task)
yield context.task_all(download_tasks)
logging.info(f"Finished meta-orchestrator for all companies")
This is the closest I've found to fanning out the singleton pattern for orchestrators... However, when I run this orchestrator, I get the following status response:
{
"name": "ledger_downloader_all_orchestrator",
"instanceId": "ledger_downloader_all_companies",
"runtimeStatus": "Failed",
"input": null,
"customStatus": null,
"output": "Non-Deterministic workflow detected: A previous execution of this orchestration scheduled a sub-orchestration task with sequence ID 0 and name 'ledger_download_orchestrator' (version '', instance ID 'AN_Movs_contabilidad'), but the current replay execution hasn't (yet?) scheduled this task. Was a change made to the orchestrator code after this instance had already started running?",
"createdTime": "2023-09-01T08:31:52Z",
"lastUpdatedTime": "2023-09-01T08:31:59Z"
}
How do I solve this issue?