How to alert when Event hub incoming and outgoing messages are not equal

CareyBoldenow 96 Reputation points
2021-10-14T15:12:41.997+00:00

There is currently a bug in the Azure Event Hub java SDK that can cause an event hub consumer to stop listening to a given partition. When this happens, there is no explicit error triggered from the application side so we have no way of knowing when this occurs. Currently, the only way we can tell is to periodically pull up the chart of incoming and outgoing messages for all our event hubs to make sure that we see even flows for each. What we really need is an alert that fires if we see that over some interval of time that the total incoming and outgoing messages for a given Event Hub are the same, and if not, fire an alert.

I have gone through all the out-of-the-box alert capabilities for Event Hubs and all we can do is measure if incoming and outgoing message are either greater than or less than some value, but no ability to compare one to the other. We did enable diagnostic logging for our Event Hub, but I don't see any log analytics query that I could execute to get me what I need. I do see some diagnostic logging that captures bulk incoming and outgoing messages, but they are across the Event Hub namespace as a whole, and we need to measure these incoming and outgoing messages by Event Hub instance.

Are there any options that might provide the details we are looking for? What I really need is a query or some way to pull back the same data you seen in the chart below:

140645-image.png

Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
719 questions
{count} votes

3 answers

Sort by: Most helpful
  1. Saurabh Sharma 23,846 Reputation points Microsoft Employee Moderator
    2021-10-23T00:52:21.633+00:00

    Hi @CareyBoldenow ,

    At the moment, there is no public API to ascertain whether or not a receiver has stopped receiving. Depending on the cause of why it stopped listening, it could be because during the re-creation of the receiver, something hung.

    One workaround a you could used is to leverage your checkpointing information to check the last updated metadata for your Blob Storage Checkpoint.
    Please provide your feedback at EventHubs Feedback channel

    Thanks
    Saurabh

    0 comments No comments

  2. Mohammadreza Pasandideh 1 Reputation point
    2022-01-26T21:34:02.12+00:00

    Hi @CareyBoldenow ,

    Started exploring option of using Kusto queries. For this purpose, we first need to send the metric alerts to Diagnostic logs:

    168768-image.png

    This will let us query metrics form the logs. 168797-image.png

    This is the query:

    * > AzureMetrics > where TimeGenerated > ago(10m) > where MetricName == "OutgoingMessages" or MetricName == "IncomingMessages" > extend Total_Outgoing_Messages = iif(MetricName == "OutgoingMessages", Total, 0.00) > extend Total_Incoming_Messages = iif(MetricName == "IncomingMessages", Total, 0.00) > summarize sum(Total_Outgoing_Messages), sum(Total_Incoming_Messages) by TimeGenerated > extend delta_messages = abs(sum_Total_Incoming_Messages - sum_Total_Outgoing_Messages)

    * Now that we have "delta_messages", we can alert if delta is greater than zero, or do it relatively.

    > AzureMetrics > | where TimeGenerated > ago(10m) > | where MetricName == "OutgoingMessages" or MetricName == "IncomingMessages" > | extend Total_Outgoing_Messages = iif(MetricName == "OutgoingMessages", Total, 0.00) > | extend Total_Incoming_Messages = iif(MetricName == "IncomingMessages", Total, 0.00) > | summarize sum(Total_Outgoing_Messages), sum(Total_Incoming_Messages) by TimeGenerated > | extend delta_messages = abs(sum_Total_Incoming_Messages - sum_Total_Outgoing_Messages)/sum_Total_Incoming_Messages

    168804-image.png

    And this is the alert.

    168805-image.png

    0 comments No comments

  3. Mohammadreza Pasandideh 1 Reputation point
    2022-02-07T19:13:50.863+00:00

    This is better query..

    AzureMetrics
    | where MetricName == "OutgoingMessages" or MetricName == "IncomingMessages"
    | extend Total_Outgoing_Messages = iif(MetricName == "OutgoingMessages", Total, 0.00)
    | extend Total_Incoming_Messages = iif(MetricName == "IncomingMessages", Total, 0.00)
    | summarize sum(Total_Outgoing_Messages), sum(Total_Incoming_Messages) by TimeGenerated
    | extend delta_messages = iff(sum_Total_Incoming_Messages > sum_Total_Outgoing_Messages, 100*abs(sum_Total_Incoming_Messages - sum_Total_Outgoing_Messages)/sum_Total_Incoming_Messages , 100*abs(sum_Total_Incoming_Messages - sum_Total_Outgoing_Messages)/sum_Total_Outgoing_Messages)
    | summarize AggregatedValue = count() by delta_messages, bin(TimeGenerated, 5m)

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.