C-C++ Code Example: Checking Transaction Boundaries
Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista
The following example reads all the messages in the queue to determine which message was the first message sent within a specific transaction, what subsequent message were sent as part of the transaction, and which message was the last message sent in the transaction.
The following three message properties are retrieved for each message.
To check transaction boundaries
Define the structures needed to retrieve messages.
Specify the properties of the message to be retrieved. (This example retrieves the message body and the three transaction boundary properties.)
Initialize the MQMSGPROPS structure.
Call MQOpenQueue to open the transactional queue with receive access. This example uses the format name supplied by the caller to open the queue.
Call MQReceiveMessage to retrieve the messages from the queue. For each message retrieved, its transaction boundary properties are checked to see if the boundaries of the transactions are not valid.
Call MQCloseQueue to close the destination queue.
Code Example
This example requires MSMQ 2.0 or later.
int XactVerify(
LPCWSTR wszQueueFormatName
)
{
// Validate the input string.
if (wszQueueFormatName == NULL)
{
return MQ_ERROR_INVALID_PARAMETER;
}
// Define the maximum number of properties and a property counter.
const int NUMBER_OF_PROPERTIES = 5 ; // Number of message properties
DWORD cPropId = 0; // Property counter
//Define an MQMSGPROPS structure.
MQMSGPROPS msgprops;
MSGPROPID aMsgPropId[NUMBER_OF_PROPERTIES];
PROPVARIANT aMsgPropVar[NUMBER_OF_PROPERTIES];
HRESULT aMsgStatus[NUMBER_OF_PROPERTIES];
HANDLE hQueue = NULL; // Queue handle
HRESULT hr = MQ_OK; // Return code
DWORD dwAccess; // Access mode of the queue
DWORD dwShareMode; // Share mode of the queue
DWORD dwRecAction; // Receive action
// Create a message body buffer.
ULONG ulBodyBufferSize = 1024;
UCHAR * pucBodyBuffer = NULL;
*pucBodyBuffer = (UCHAR *)malloc(ulBodyBufferSize);
if (pucBodyBuffer == NULL)
{
return MQ_ERROR_INSUFFICIENT_RESOURCES;
}
memset(pucBodyBuffer, 0, ulBodyBufferSize);
OBJECTID xid; // Transaction ID buffer
// Specify message properties.
aMsgPropId[cPropId] = PROPID_M_BODY_SIZE; // 0: Body size
aMsgPropVar[cPropId].vt = VT_UI4;
cPropId++;
aMsgPropId[cPropId] = PROPID_M_BODY; // 1: Body
aMsgPropVar[cPropId].vt = VT_VECTOR | VT_UI1;
aMsgPropVar[cPropId].caub.pElems = (UCHAR*)pucBodyBuffer;
aMsgPropVar[cPropId].caub.cElems = ulBodyBufferSize;
cPropId++;
aMsgPropId[cPropId] = PROPID_M_FIRST_IN_XACT; // 2: First
aMsgPropVar[cPropId].vt = VT_UI1;
cPropId++;
aMsgPropId[cPropId] = PROPID_M_LAST_IN_XACT; // 3: Last
aMsgPropVar[cPropId].vt = VT_UI1;
cPropId++;
aMsgPropId[cPropId] = PROPID_M_XACTID; // 4: Xact index
aMsgPropVar[cPropId].vt = VT_UI1 | VT_VECTOR;
aMsgPropVar[cPropId].caub.pElems = (PUCHAR)&xid;
aMsgPropVar[cPropId].caub.cElems = sizeof(OBJECTID);
cPropId++;
// Initialize the MQMSGPROPS structure.
msgprops.cProp = cPropId; // Number of message properties
msgprops.aPropID = aMsgPropId; // IDs of the message properties
msgprops.aPropVar = aMsgPropVar; // Values of the message properties
msgprops.aStatus = aMsgStatus; // Error reports
// Open the destination queue to read the message.
dwAccess = MQ_RECEIVE_ACCESS; // Access mode of the queue
dwShareMode = MQ_DENY_RECEIVE_SHARE; // Share mode of the queue
hr = MQOpenQueue(wszQueueFormatName, // Format name of the queue
dwAccess, // Access mode
dwShareMode, // Share mode
&hQueue // OUT: Handle to queue
);
if (FAILED(hr))
{
free (pucBodyBuffer);
return hr;
}
// Declare variables for the processing messages.
OBJECTID oidCurrentXID;
ZeroMemory(&oidCurrentXID, sizeof(OBJECTID));
BOOL fInsideXact = FALSE;
// Test the transaction boundaries and process the messages if no
// messages are lost.
for ( ; ; )
{
// Retrieve a message.
dwRecAction = MQ_ACTION_RECEIVE; // Receive action
hr = MQReceiveMessage(
hQueue, // Queue handle
0, // Maximum time (msec)
dwRecAction, // Receive action
&msgprops, // Message property structure
NULL, // No OVERLAPPED structure
NULL, // No callback function
NULL, // No cursor
NULL // Not in a transaction
);
if (FAILED(hr))
{
if ( hr != MQ_ERROR_IO_TIMEOUT)
{
fprintf(stderr, "Retrieving the message failed. Error: 0x%x\n",hr);
}
if (fInsideXact)
{
fprintf(stderr, "The last message(s) in transaction %x is missing.\n", oidCurrentXID.Uniquifier);
}
break;
}
// Test for the case of the first message in a transaction.
if (aMsgPropVar[2].bVal == MQMSG_FIRST_IN_XACT)
{
if (!fInsideXact)
{
// OK - starting a new transaction.
fInsideXact = TRUE;
oidCurrentXID = xid;
fprintf(stderr, "First message from transaction %x\n", xid.Uniquifier);
}
else
{
// Error condition: Starting a new transaction before
// the previous transaction has been closed.
fprintf(stderr, "***The last message(s) of transaction %x is missing.\n", oidCurrentXID.Uniquifier);
}
}
// Test for the case of the last message in a transaction.
if (aMsgPropVar[3].bVal == MQMSG_LAST_IN_XACT)
{
if (fInsideXact && xid.Uniquifier==oidCurrentXID.Uniquifier)
{
// OK - we are within the same transaction.
fInsideXact = FALSE;
fprintf(stderr, "Last message from transaction %x\n", xid.Uniquifier);
// At this point we have full data on one
// transaction. Process this information as
// needed by your application.
}
else
{
// Error condition: We have the last message of transaction X
// while within transaction Y.
if (xid.Uniquifier != oidCurrentXID.Uniquifier && oidCurrentXID.Uniquifier!=0)
{
fprintf(stderr, "***The last message(s) of transaction %x is missing\n", oidCurrentXID.Uniquifier);
}
if (!fInsideXact)
{
fprintf(stderr, "***The first message(s) of transaction %x is missing.\n", xid.Uniquifier);
}
}
}
// Test whether the message is an inner message of a current transaction.
if (aMsgPropVar[2].bVal == MQMSG_NOT_FIRST_IN_XACT &&
aMsgPropVar[3].bVal == MQMSG_NOT_LAST_IN_XACT)
{
if (xid.Uniquifier==oidCurrentXID.Uniquifier)
{
// OK - just inner message
fprintf(stderr, "Message from transaction %x\n", xid.Uniquifier);
}
else
{
// Error condition: We have an inner message of
// transaction X while within transaction Y.
if (oidCurrentXID.Uniquifier!=0)
{
fprintf(stderr, "***The last message(s) of transaction %x is missing.\n", oidCurrentXID.Uniquifier);
}
if (!fInsideXact)
{
fprintf(stderr, "***The first message(s) of transaction %x is missing.\n", xid.Uniquifier);
}
oidCurrentXID = xid;
fInsideXact = TRUE;
}
}
}
// Close the queue.
hr = MQCloseQueue(hQueue);
free (pucBodyBuffer);
return MQ_OK;
}