C-C++ Code Example: Correlation Identifier Filters

 

Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista

This example provides an application-defined function that peeks at the PROPID_M_CORRELATIONID property of each message in the queue and removes all the messages that have a specific correlation identifier that is provided by the caller.

Note

In this example, the cursor used to navigate the queue behaves differently if a message is, or is not, removed from the queue. If a message is removed from the queue, Message Queuing moves the cursor to the next message. If the message is not removed from the queue, the application must move the cursor.

To filter messages by correlation identifier

  1. Define the maximum number of queue properties to be specified and the queue property counter.

  2. Define the MQMSGPROPS structure.

  3. Specify PROPID_M_CORRELATIONID. The correlation ID is stored in an array of 20 unsigned characters, whose size can be defined by the constant PROPID_M_CORRELATIONID_SIZE.

  4. Initialize the MQMSGPROPS structure.

  5. Call MQOpenQueue to open the queue with receive access. Receive access allows the application to peek at or remove the messages in the queue.

  6. Call MQCreateCursor to create the cursor for navigating the queue. The returned cursor handle is used in the calls to read messages.

  7. Call MQReceiveMessage with peek access to peek at the first message in the queue. This call also initializes the cursor so that the cursor points at the first message in the queue.

  8. In a loop structure, continue to call MQReceiveMessage to peek at and remove the messages in the queue. Note that the receive action of each call varies depending on if the previous message was or was not removed from the queue.

    If the current message has the correct correlation identifier, set dwRecAction to MQ_ACTION_RECEIVE, call MQReceiveMessage to remove the message, then reset dwRecAction to MQ_ACTION_PEEK_CURRENT to process the next message. This will remove the current message and Message Queuing moves the cursor to the next message.

    If the message does not have the correct correlation identifier, set dwRecAction to MQ_ACTION_PEEK_NEXT to move the cursor and process the next message. In this case, the application must move the cursor to the next message.

  9. Call MQCloseQueue and MQCloseCursor to free the resources.

The following code example can be run on all versions of Message Queuing.

HRESULT FilterCorrelationId(

LPCWSTR wszQueueFormatName,

CAUB * pCorrelationId

)

{

// Validate the input parameters.

if ((wszQueueFormatName == NULL) || (pCorrelationId == NULL))

{

return MQ_ERROR_INVALID_PARAMETER;

}

// Define the maximum number of properties and a property counter.

const int NUMBEROFPROPERTIES = 5; // Number of properties

DWORD cPropId = 0; // Property counter

// Define the MQMSGPROPS structure.

MQMSGPROPS msgprops;

MSGPROPID aMsgPropId[NUMBEROFPROPERTIES];

PROPVARIANT aMsgPropVar[NUMBEROFPROPERTIES];

HRESULT aMsgStatus[NUMBEROFPROPERTIES];

HANDLE hQueue = NULL; // Queue handle

HANDLE hCursor = NULL; // Cursor handle

HRESULT hr = MQ_OK; // Return code

DWORD dwRecAction; // Receive action mode

// Specify the message properties to be retrieved.

UCHAR aCorrelationId[PROPID_M_CORRELATIONID_SIZE];

aMsgPropId[cPropId] = PROPID_M_CORRELATIONID; // Property ID

aMsgPropVar[cPropId].vt = VT_VECTOR | VT_UI1; // Type indicator

aMsgPropVar[cPropId].caub.cElems = PROPID_M_CORRELATIONID_SIZE;

aMsgPropVar[cPropId].caub.pElems = aCorrelationId;

cPropId++;

// Initialize the MQMSGPROPS structure.

msgprops.cProp = cPropId; // Number of properties

msgprops.aPropID = aMsgPropId; // IDs of properties

msgprops.aPropVar = aMsgPropVar; // Values of properties

msgprops.aStatus = aMsgStatus; // Error reports

// Open the queue to filter messages.

hr = MQOpenQueue(

wszQueueFormatName, // Format name of queue

MQ_RECEIVE_ACCESS, // Access mode

MQ_DENY_RECEIVE_SHARE, // Share mode

&hQueue // OUT: Handle of queue

);

if (FAILED(hr))

{

return hr;

}

// Create the cursor used to navigate through the queue.

hr = MQCreateCursor(

hQueue, // Queue handle

&hCursor // OUT: Handle to cursor

);

if (FAILED(hr))

{

MQCloseQueue(hQueue);

return hr;

}

// Peek at first message in the queue.

hr = MQReceiveMessage(

hQueue, // Queue handle

0, // Max time (msec)

MQ_ACTION_PEEK_CURRENT, // Receive action

&msgprops, // Message property structure

NULL, // No OVERLAPPED structure

NULL, // No callback function

hCursor, // Cursor handle

MQ_NO_TRANSACTION // Not in a transaction

);

if (FAILED(hr))

{

MQCloseCursor(hCursor);

MQCloseQueue(hQueue);

return hr;

}

// Filter messages from queue.

do

{

if ((msgprops.aPropVar[0].caub.cElems == pCorrelationId->cElems) &&

(!memcmp(msgprops.aPropVar[0].caub.pElems, pCorrelationId->pElems, pCorrelationId->cElems) ))

{

dwRecAction = MQ_ACTION_RECEIVE; // Receive action

hr = MQReceiveMessage(

hQueue, // Queue handle

0, // Max time (msec)

dwRecAction, // Receive action

&msgprops, // Message property structure

NULL, // No OVERLAPPED structure

NULL, // No callback function

hCursor, // Cursor handle

MQ_NO_TRANSACTION // Not in a transaction

);

if (FAILED(hr))

{

break;

}

//

// Process the message.

//

dwRecAction = MQ_ACTION_PEEK_CURRENT; // Receive action

}

else

{

dwRecAction = MQ_ACTION_PEEK_NEXT; // Receive action

}

hr = MQReceiveMessage(

hQueue, // Queue handle

0, // Max time (msec)

dwRecAction, // Receive action

&msgprops, // Message property structure

NULL, // No OVERLAPPED structure

NULL, // No callback function

hCursor, // Cursor handle

MQ_NO_TRANSACTION // Not in a transaction

);

if (FAILED(hr))

{

break;

}

} while (SUCCEEDED(hr));

// Close cursor and queue.

hr = MQCloseCursor(hCursor);

if (FAILED(hr))

{

MQCloseQueue(hQueue);

return hr;

}

hr = MQCloseQueue(hQueue);

return hr;

}