I am new to Azure Service Bus and trying to understand how session enabled queue works.
I have 3 console applications: 1 Message Sender app, to send message to queue and 2 Message Receiver apps.
The Azure Service Bus queue is session enabled and I am trying to implement concurrent sessions.
Each user who runs the message sender console application will send a message to the queue and this message has to be received by both receiver console applications.
And the receiver applications must receive/process each message in sequential order, Receiver1 first and then Receiver2. For that I am tagging each message
with the receiver application name while sending and then in receiver code checking the tag name for the receiver application to receive.
I am not receiving messages in sequential order within a session id.
Below is the code I am using for sending and receiving message.
Message Sender code:
public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
public async Task MainAsync()
{
await SendSessionMessagesAsync();
}
public async Task SendSessionMessagesAsync()
{
// Below userList coming from db
foreach (var user in userList)
{
List<ReceiverApplicationName> receiverApplicationNameList = new List<ReceiverApplicationName>();
var receiverApplicationNames = //getting the ReceiverApplicationName list from appsettings.json, [ "Receiver1", "Receiver2" ]
foreach (var item in receiverApplicationNames)
{
ReceiverApplicationName receiverApplicationName = new ReceiverApplicationName();
receiverApplicationName.UserId = user.Id;
receiverApplicationName.ApplicationName = item;
receiverApplicationNameList.Add(receiverApplicationName);
}
foreach (var queueMessageItem in receiverApplicationNameList)
{
var messageBody = JsonConvert.SerializeObject(queueMessageItem);
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
message.SessionId = "Session " + queueMessageItem.UserId;
Console.WriteLine($"Sending message for UserId: {queueMessageItem.UserId}, SessionId: {message.SessionId}, Message: {messageBody}");
await queueClient.SendAsync(message); // 2 messages(Receiver1 and Receiver2) for each user
}
}
}
Message Receiver code(Receiver1):
public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}
public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}
public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}
public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver1")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}
Message Receiver code(Receiver2):
public static int Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
public async Task MainAsync()
{
RegisterOnSessionHandlerAndReceiveSessionMessages();
}
public void RegisterOnSessionHandlerAndReceiveSessionMessages()
{
var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentSessions = 2,
MessageWaitTimeout = TimeSpan.FromSeconds(5),
AutoComplete = false,
};
queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);
}
public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}
public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)
{
var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
if (result.ReceiverApplicationName == "Receiver2")
{
Console.WriteLine($"Received message for UserId: {result.UserId}, Session: {session.SessionId}, Message: SequenceNumber: {message.SystemProperties.SequenceNumber}, Body:{Encoding.UTF8.GetString(message.Body)}, at: {DateTime.Now}");
await session.CompleteAsync(message.SystemProperties.LockToken);
}
}
Results:
The timestamp of the received message for Receiver1 and Receiver2 are not in sequence. For ex, in UserId = 2, Receiver1 should receive message before it is received by Receiver2 application or maybe I am doing something incorrect here.
Message sender:
Receiver1:
Receiver2: