How to deal with long running Azure Functions in Azure Data Factory

Christian Hansen 21 Reputation points
2020-11-16T13:32:28.887+00:00

I have a Function (HttpTrigger) that can take a bit to run 5-10 minutes. It reads some information from the body of the request to perform some actions:

[FunctionName("myFunc")]
public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");

            string responseMessage = string.Empty;

            var content = await new StreamReader(req.Body).ReadToEndAsync();
            myClass body = JsonConvert.DeserializeObject<myClass>(content);

            for (int i = 0; i < 10; i++)
            {
                   anotherFunctionAsync(i, myClass).GetAwaiter().GetResult();
                   // Perform tasks totaling 5-10 minutes
            }
            return new OkObjectResult("everything went ok");
    }

However often I run into issues in my Data Factory where I will get an error like:

Operation on target your_function failed: Call to provided Azure function 'myFunc' failed with status-'BadGateway' and message 502 - Web server received an invalid response while acting as a gateway or proxy server.

I understand that this is due to the Function running for a long time.

How can I deal with this, such that Data Factory waits for the function to properly complete and not fail (when it is still running)?

I have already taken a look at the Durable Functions and Best Practices, but I don't know how to apply it when it has to communicate with Data Factory.

Since the function I call is async, could I just remove waiting for response?

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,605 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
10,072 questions
0 comments No comments
{count} votes

2 answers

Sort by: Most helpful
  1. Nasreen Akter 10,791 Reputation points
    2020-11-17T16:16:16.23+00:00

    Hi @Christian Hansen ,

    Here is an example code for Durable Function and some screenshots how to call the Durable Function from the DataFactory.. Hope it will help. Thanks! :)

    namespace abc  
    {  
        public static class DF_Parse  
        {  
              
            [FunctionName("DF_Parse_HttpStart")]  
            public static async Task<HttpResponseMessage> HttpStart([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req, [DurableClient] IDurableOrchestrationClient starter, ILogger log)  
            {  
                // Function input comes from the request content.  
                string requestBody = await req.Content.ReadAsStringAsync();  
                string instanceId = await starter.StartNewAsync("DF_Parse", null, requestBody);  
      
                log.LogInformation($"Started orchestration with ID = '{instanceId}'.");  
      
                return starter.CreateCheckStatusResponse(req, instanceId);  
            }  
      
            [FunctionName("DF_Parse")]  
            public static async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)  
            {  
                log.LogInformation("C# HTTP trigger function processed a request.");  
                try  
                {  
                    string sRequestBody = context.GetInput<string>();  
                    List<Task<string>> contents = new List<Task<string>>();  
                    myClass body = JsonConvert.DeserializeObject<myClass>(sRequestBody);  
                    for (int i = 0; i < 10; i++)  
                    {  
                        JObject parameters = new JObject();  
                        parameters.Add("iParam", i);  
                        parameters.Add("myClass", JsonConvert.SerializeObject(myClass));  
                        contents.Add(context.CallSubOrchestratorAsync<string>("DF_Parse_AnotherFunctionAsync", JsonConvert.SerializeObject(parameters)));  
                    }  
                    await Task.WhenAll(contents);  
      
                    /* you can also check and work further with the response on each call if you want  
                    //create a dictionary object from the responses  
                    foreach (Task<string> eachTask in contents)  
                    {  
                        if (eachTask.IsCompletedSuccessfully)  
                        {  
                            record = ParseJsonString(eachTask.Result, log);  
      
                            if (record.Count > 0)  
                            {  
                                totalRecords.Add(record);  
                            }  
                        }  
                    }  
                    */  
                }  
                catch (Exception ex)  
                {  
                    log.LogError(ex, "DF_Parse failed!");  
                    throw ex;  
                }  
            }  
      
            
            [FunctionName("DF_Parse_AnotherFunctionAsync")]  
            public static async Task AnotherFunctionAsync([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)   
            {  
                try  
                {  
                    string input = context.GetInput<string>();  
                    List<string> list = new List<string>();  
      
                    list = await context.CallActivityAsync<List<string>>("DF_Parse_ActivityFunction", input);  
                     
                }  
                catch (Exception ex)  
                {  
                    log.LogInformation(ex.Message);  
                    throw ex;  
                }  
            }  
      
            [FunctionName("DF_Parse_ActivityFunction")]  
            public static async Task<List<string>> ActivityFunction([ActivityTrigger] string input, ILogger log)  
            {  
                log.LogInformation($"ActivityFunction() started...");  
      
                Dictionary<string, dynamic> parameters;  
                List<string> outputList = new List<string>();  
      
                try  
                {  
                    parameters = JsonConvert.DeserializeObject<Dictionary<string, dynamic>>(input);  
                   //function body  
                    log.LogInformation("Successful!");  
                }  
                catch (Exception ex)  
                {  
                    log.LogInformation($"Failed with Error: {ex.Message}");  
                    throw ex;  
                }  
      
                return outputList;  
            }  
        }  
    }  
    
    
    //host.json file  
      
    {  
      "version": "2.0",  
      "extensions": {  
        "durableTask": {  
          "hubName": "ParseTaskQueue",  
          "maxConcurrentActivityFunctions": 20,  
          "maxConcurrentOrchestratorFunctions": 20,  
          "extendedSessionsEnabled": false,  
          "extendedSessionIdleTimeoutInSeconds": 30,  
          "useGracefulShutdown": false  
        }  
      },  
      "logging": {  
        "logLevel": {  
          "Host.Triggers.DurableTask": "Information"  
        }  
      }  
    }  
    

    40475-durableazurefunction.jpg40476-webactivity.jpg40463-checkstatus.jpg

    4 people found this answer helpful.
    0 comments No comments

  2. MartinJaffer-MSFT 26,056 Reputation points
    2020-11-16T20:59:23.573+00:00

    Hello @ChristianHansen-0520 and thank you for your question.

    The general idea, is to have the function acknowledge receipt of the request from Data Factory and reply with an endpoint the Factory can check on later to get the status.

    In the pipeline, the Function activity would be followed by an Until loop. The Until loop would contain a Web activity which queries the endpoint returned by the Function activity to get its status (success or fail or running). The loop condition would be set to repeat as long as the Web activity receives the "running" status from the endpoint. Break on success or fail response.

    If you do not want to loop a web activity, there is another option. If you have the Function App write its success/fail to a file, then you can use the Validation activity to do the polling for you. The Validation activity can check for the presence of a file, doing retries for a set amount of time.

    Another option might be to use a Webhook activity. In this case, Data Factory would pass an endpoint to the Function App and expect the Function App to make a separate call back in the allotted time.

    Can you tell me where you see the option in Data Factory to "remove waiting for response?"