I have an Azure Function that pulls messages from a Service Bus queue and calls an orchestrator.
This orchestrator calls several other sub-orchestrators.
All the orchestrators and sub-orchestrators use activities for non-deterministic things, so everything in the orchestrators is deterministic.
However I always get an error when my second sub-orchestrator starts, that my first sub-orchestrator was non-deterministic.
Between the two sub-orchestrators there is even an activity with yield and this does not cause the error to occur so I am fairly certain that it does not have anything to do with an orchestrator being non-deterministic.
Is there a known error for Python Functions that causes this? Because at this point I am all out of ideas.
The following is the code I can share.
Call to sub-orchestrator:
running_queue_output = yield context.call_sub_orchestrator(
"waiting_for_running_queue_sub_orchestrator",
wait_for_running_queue_input
)
Sub-Orchestrator being called:
def waiting_for_running_queue_sub_orchestrator(context: df.DurableOrchestrationContext):
"""
Polls the running-queue until a spot has opened up. Then writes a message to the running queue
to claim a spot.
"""
sub_orchestrator_input = context.get_input()
sb_client_json_serialized = sub_orchestrator_input["service_bus_client"]
logging_client_json_serialized = sub_orchestrator_input["logging_client"]
content = sub_orchestrator_input["content"]
log.error(f"########## Waiting for running queue to open up")
log_client = ApplicationInsightsLoggerClient.from_json(logging_client_json_serialized)
answer_total_messages = yield context.call_activity(
"get_total_running_queue_messages",
sb_client_json_serialized
)
if not answer_total_messages["success"]:
return answer_total_messages
current_messages = answer_total_messages["number_of_messages"]
log_client.log_to_application_insights(
message = f"There are currently {current_messages} in the running queue."
)
if current_messages >= FABRIC_LIMIT:
answer_random_wait_period = yield context.call_activity("get_random_wait_period", INTERVAL)
if not answer_random_wait_period["success"]:
return answer_random_wait_period
wait_interval_in_seconds = answer_random_wait_period["wait_interval_in_seconds"]
time_of_next_polling = context.current_utc_datetime + datetime.timedelta(seconds = wait_interval_in_seconds)
log_client.log_to_application_insights(
message = f"Queue is currently full. Will poll again at {time_of_next_polling}"
)
yield context.create_timer(fire_at = time_of_next_polling)
context.continue_as_new(sub_orchestrator_input)
log_client.log_to_application_insights(
message = "A spot in the running queue opened up. Reserving that spot."
)
answer_write_to_queue = yield context.call_activity(
"write_to_service_bus_queue",
(sb_client_json_serialized, content, "running")
)
if not answer_write_to_queue["success"]:
return answer_write_to_queue
log_client.log_to_application_insights(
message = "Successfully reserved a spot in the running queue"
)
waiting_for_running_queue_output = {
"success": True
}
return waiting_for_running_queue_output
I am beginning to suspect context.current_utc_datetime of being the issue but I'm not sure. Any ideas are welcome.