Not able to receive messages in sequential order from concurrent session in Azure Service Bus

Anirud Thapliyal 96 Reputation points
2021-07-13T07:22:56.667+00:00

I am new to Azure Service Bus and trying to understand how session enabled queue works.

I have 3 console applications: 1 Message Sender app, to send message to queue and 2 Message Receiver apps.
The Azure Service Bus queue is session enabled and I am trying to implement concurrent sessions.

Each user who runs the message sender console application will send a message to the queue and this message has to be received by both receiver console applications.
And the receiver applications must receive/process each message in sequential order, Receiver1 first and then Receiver2. For that I am tagging each message
with the receiver application name while sending and then in receiver code checking the tag name for the receiver application to receive.

I am not receiving messages in sequential order within a session id.

Below is the code I am using for sending and receiving message.

Message Sender code:

public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}

public async Task MainAsync()
{
await SendSessionMessagesAsync();
}

public async Task SendSessionMessagesAsync()
{
// Below userList coming from db

foreach (var user in userList)
{
List<ReceiverApplicationName> receiverApplicationNameList = new List<ReceiverApplicationName>();
var receiverApplicationNames = //getting the ReceiverApplicationName list from appsettings.json, [ "Receiver1", "Receiver2" ]
foreach (var item in receiverApplicationNames)
{
ReceiverApplicationName receiverApplicationName = new ReceiverApplicationName();
receiverApplicationName.UserId = user.Id;
receiverApplicationName.ApplicationName = item;
receiverApplicationNameList.Add(receiverApplicationName);
}
foreach (var queueMessageItem in receiverApplicationNameList)
{
var messageBody = JsonConvert.SerializeObject(queueMessageItem);
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
message.SessionId = "Session " + queueMessageItem.UserId;
Console.WriteLine($"Sending message for UserId: {queueMessageItem.UserId}, SessionId: {message.SessionId}, Message: {messageBody}");
await queueClient.SendAsync(message); // 2 messages(Receiver1 and Receiver2) for each user
}
}
}

Message Receiver code(Receiver1):

public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}

public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}

public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}

public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}

public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver1")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}

Message Receiver code(Receiver2):

public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}

public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}

public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}

public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}

public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver2")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}

Results:

The timestamp of the received message for Receiver1 and Receiver2 are not in sequence. For ex, in UserId = 2, Receiver1 should receive message before it is received by Receiver2 application or maybe I am doing something incorrect here.

Message sender:
114223-image.png

Receiver1:
114165-image.png

Receiver2:
114202-image.png

Azure Service Bus
Azure Service Bus
An Azure service that provides cloud messaging as a service and hybrid integration.
649 questions
C#
C#
An object-oriented and type-safe programming language that has its roots in the C family of languages and includes support for component-oriented programming.
11,111 questions
0 comments No comments
{count} votes

2 answers

Sort by: Most helpful
  1. Bruce (SqlWork.com) 68,236 Reputation points
    2021-07-13T15:01:37.883+00:00

    Each receiver receives messages for a given session in order. The order of sessions is not defined. As a receiver will lock a message in a session, another receiver waiting on the lock will process a different session.

    If you want processing to be receiver 1, then receiver 2, receiver should queue the the request to receiver 2 after it has processed it.

    Even if you used one session, while both receivers would see the messages in order, the messages would not be processed in receiver order. 2 could process several 2 messages before 1 processed any.


  2. Pramod Valavala 20,641 Reputation points Microsoft Employee
    2021-07-14T08:23:36.763+00:00

    @Anirud Thapliyal Message Sessions are useful when particular messages need to be handled sequentially even when multiple receivers are listening for messages on the same queue. But if you had only one receiver per queue, you could simply process messages in order by fetching one message at a time.

    Based on the scenario you described, it would be best to have two separate queues, one for Receiver 1 and one for Receiver 2. The sender would send messages into Queue 1 which Receiver 1 processes and sends the same message to Queue 2 for Receiver 2 to process.

    That being said, you could still achieve the above with a single queue and sessions by changing the Session ID of you messages to be Receiver 1 and Receiver 2. Instead of fetching available sessions, you receiver will specifically process one defined session using session processor.

    And after Receiver 1 processes a message, it would send the same message back to the queue but the Session ID for Receiver 2.

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.