The durable entity function I've setup as output for a streaming analytics job doesn't trigger or receive any data. I can see incoming input data in the stream analytics job and it outputs to store the data correctly (for a stream analytics job storage output test I also setup). If I test the CalculatePositioni function with postman the function and entity receives the data and updates the state correctly.
I also get a binding error for the function app and I can't see what's missing:
Stream Analytics Job output with my duration funcapp setup:
I selected keyname default at output creation. I note don't see anywhere or local.settings.json in the function app where I'd need to enter the hashed key.
My function app.
namespace BladeFunctionApps
{
public static class TransformAlgorithm
{
public static async Task Process_Algorithm([EntityTrigger] IDurableEntityContext context, ILogger log)
{
//Set or Get intermediate calculation state values
Algorithm_SaveStateVariables intermediate_calc_results;
if (context.HasState)
{
intermediate_calc_results = context.GetState<Algorithm_SaveStateVariables>();
log.LogInformation("Success getting state values: CumItems:" + intermediate_calc_results.AverageMagnetometerX +
" Avg:"+ intermediate_calc_results.AverageMagnetometerX);
}
else
{
log.LogError("Failed to get state values.");
intermediate_calc_results = new Algorithm_SaveStateVariables();
}
//Get values from incoming Json object and perform calculations
JObject deviceBatchMessage = (JObject)JsonConvert.DeserializeObject(context.GetInput<string>());
Random rnd = new Random();
int incomingDataLines = rnd.Next(1, 10);
intermediate_calc_results.DataItemsNumber=incomingDataLines++;
bool parsebool=float.TryParse(deviceBatchMessage["body"]["magnetX"].ToString(), out float tempMagnX);
if(parsebool)
intermediate_calc_results.AverageMagnetometerX+= tempMagnX;
else
{
log.LogError("Could not parse to float: " + deviceBatchMessage["body"]["magnetX"].ToString());
intermediate_calc_results.AverageMagnetometerX += 0;
}
context.SetState(intermediate_calc_results);
}
[FunctionName("CalculatePosition")]
public static async Task<ActionResult> CalculateTransform(
[HttpTrigger(AuthorizationLevel.Function, "get", Route = null)] HttpRequest req,
[DurableClient] IDurableEntityClient client, ILogger log)
{
//read data
// Extract the body from the request
log.LogInformation("I'm at the top of CalculatePosition");
string jsonContent = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(jsonContent))
{
log.LogInformation("CalculatePosition: Received Data null or empty.");
return new StatusCodeResult(204); // 204, ASA connectivity check
}
else
log.LogInformation("CalculatePosition: Received Data:" + jsonContent);
//send data to algo for processing
var entityId = new EntityId(nameof(Process_Algorithm), "ProcessTransformAlgorithm");
await client.SignalEntityAsync(entityId,"FullOperation",jsonContent);
return (ActionResult)new OkObjectResult($"CalculatePosition: CalculatePosition Success");
}
}
}
What binding is missing?
The input/output setup with Query in the Stream Analytics Job should work, I think so because it's the same setup that I use to output into a blob and works with the blob, not with Azure Steam analytics.