Freigeben über


C-C++ Code Example: Reading Messages Synchronously

 

Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista

The following example provides an application-defined function that synchronously reads each message in a known queue, removing each message as it is read.

For information about reading messages synchronously, see Synchronous Reading.

To read messages synchronously

  1. Define the maximum number of queue properties to be specified and the queue property counter.

  2. Define an MQMSGPROPS structure.

  3. Specify the message properties to retrieve. This example retrieves PROPID_M_LABEL, PROPID_M_LABEL_LEN, PROPID_M_BODY, PROPID_M_BODY_SIZE, and PROPID_M_BODY_TYPE.

Note

The maximum label length, including the string-terminating character, is MQ_MAX_MSG_LABEL_LEN (250 Unicode characters). This value must be reset for each message.

  1. Initialize the MQMSGPROPS structure.

  2. Create the format name of the queue. This format name is used to open the queue. Note that the input strings used to create the format name are validated in this function, but it is the responsibility of the caller to ensure that these strings are null-terminated.

  3. Call MQOpenQueue to open the queue with receive access. Receive access allows the application to peek at or remove the messages in the queue.

  4. In a loop, call MQReceiveMessage to retrieve the messages in the queue. If the body buffer is too small for any of the messages, reallocate memory for it.

  5. Call MQCloseQueue and free the resources.

Code Example

The following code example can be run on all versions of Message Queuing.

HRESULT ReadingDestQueue(  
                         WCHAR * wszQueueName,  
                         WCHAR * wszComputerName  
                         )  
{  
  
  // Define the required constants and variables.  
  const int NUMBEROFPROPERTIES = 5;  
  DWORD cPropId = 0;  
  HRESULT hr = MQ_OK;                                 // Return code  
  HANDLE hQueue = NULL;                               // Queue handle  
  ULONG ulBufferSize = 2;  
  
  // Define an MQMSGPROPS structure.  
  MQMSGPROPS msgprops;  
  MSGPROPID aMsgPropId[NUMBEROFPROPERTIES];  
  MQPROPVARIANT aMsgPropVar[NUMBEROFPROPERTIES];  
  HRESULT aMsgStatus[NUMBEROFPROPERTIES];  
  
  // Specify the message properties to be retrieved.  
  aMsgPropId[cPropId] = PROPID_M_LABEL_LEN;           // Property ID  
  aMsgPropVar[cPropId].vt = VT_UI4;                   // Type indicator  
  aMsgPropVar[cPropId].ulVal = MQ_MAX_MSG_LABEL_LEN;  // Length of label  
  cPropId++;  
  
  WCHAR wszLabelBuffer[MQ_MAX_MSG_LABEL_LEN];         // Label buffer  
  aMsgPropId[cPropId] = PROPID_M_LABEL;               // Property ID  
  aMsgPropVar[cPropId].vt = VT_LPWSTR;                // Type indicator  
  aMsgPropVar[cPropId].pwszVal = wszLabelBuffer;      // Label buffer  
  cPropId++;  
  
  UCHAR * pucBodyBuffer = NULL;  
  pucBodyBuffer = (UCHAR*)malloc(ulBufferSize);   
  if (pucBodyBuffer == NULL)  
  {  
    return MQ_ERROR_INSUFFICIENT_RESOURCES;  
  }  
  memset(pucBodyBuffer, 0, ulBufferSize);  
  aMsgPropId[cPropId] = PROPID_M_BODY_SIZE;           // Property ID  
  aMsgPropVar[cPropId].vt = VT_NULL;                  // Type indicator  
  cPropId++;  
  
  aMsgPropId[cPropId] = PROPID_M_BODY;                // Property ID  
  aMsgPropVar[cPropId].vt = VT_VECTOR | VT_UI1;       // Type indicator  
  aMsgPropVar[cPropId].caub.pElems = (UCHAR*)pucBodyBuffer;  // Body buffer  
  aMsgPropVar[cPropId].caub.cElems = ulBufferSize;    // Buffer size  
  cPropId++;  
  
  aMsgPropId[cPropId] = PROPID_M_BODY_TYPE;           // Property ID  
  aMsgPropVar[cPropId].vt = VT_NULL;                  // Type indicator  
  cPropId++;  
  
  // Initialize the MQMSGPROPS structure.  
  msgprops.cProp = cPropId;                           // Number of message properties  
  msgprops.aPropID = aMsgPropId;                      // IDs of the message properties  
  msgprops.aPropVar = aMsgPropVar;                    // Values of the message properties  
  msgprops.aStatus = aMsgStatus;                      // Error reports  
  
  // Validate the input strings.  
  if (wszQueueName == NULL || wszComputerName == NULL)  
  {  
    return MQ_ERROR_INVALID_PARAMETER;  
  }  
  
  // Create a direct format name.  
  WCHAR * wszFormatName = NULL;  
  DWORD dwFormatNameLength = 0;  
  dwFormatNameLength = wcslen(wszQueueName) + wcslen(wszComputerName) + 12;  
  wszFormatName = new WCHAR[dwFormatNameLength];  
  if (wszFormatName == NULL)  
  {  
    return MQ_ERROR_INSUFFICIENT_RESOURCES;  
  }  
  memset(wszFormatName, 0, dwFormatNameLength);  
  // ************************************  
  // You must concatenate "DIRECT=OS:", wszComputerName, "\",   
  // and wszQueueName into the wszFormatName buffer.  
  // wszFormatName = "DIRECT=OS:" + wszComputerName + "\" +     
  // wszQueueName  
  // If the format name is too long for the buffer, return FALSE.  
  // ************************************  
  
  // Open the queue with receive access.  
  hr = MQOpenQueue(  
                   wszFormatName,                      // Format name of the queue  
                   MQ_RECEIVE_ACCESS,                  // Access mode  
                   MQ_DENY_NONE,                       // Share mode  
                   &hQueue                             // OUT: Queue handle  
                   );  
  // Free the memory that was allocated for the format name string.  
  if (wszFormatName)  
  {  
    delete [] wszFormatName;  
  }  
  
  // Handle any error returned by MQOpenQueue.  
  if (FAILED(hr))  
  {  
    return hr;  
  }  
  
  for ( ; ; )  
  {  
    aMsgPropVar[0].ulVal = MQ_MAX_MSG_LABEL_LEN;  
    hr = MQReceiveMessage(  
                          hQueue,                     // Queue handle  
                          1000,                       // Max time to (msec) to receive the message  
                          MQ_ACTION_RECEIVE,          // Receive action  
                          &msgprops,                  // Message property structure  
                          NULL,                       // No OVERLAPPED structure  
                          NULL,                       // No callback function  
                          NULL,                       // No cursor handle  
                          MQ_NO_TRANSACTION           // Not in a transaction  
                          );  
  
    if (hr == MQ_ERROR_BUFFER_OVERFLOW)  
    {  
      ulBufferSize = aMsgPropVar[2].ulVal*sizeof(UCHAR);  
      pucBodyBuffer = (UCHAR*)realloc(pucBodyBuffer, ulBufferSize);  
      if (pucBodyBuffer == NULL)  
      {  
        return MQ_ERROR_INSUFFICIENT_RESOURCES;  
      }  
      memset(pucBodyBuffer, 0, ulBufferSize);  
      aMsgPropVar[3].caub.pElems = (UCHAR*)pucBodyBuffer;  
      aMsgPropVar[3].caub.cElems = ulBufferSize;  
      continue;  
    }  
  
    if (FAILED(hr))  
    {  
      wprintf(L"No messages. Closing queue\n");  
      break;  
    }  
  
    // If the message contains a label, print it.  
    if (msgprops.aPropVar[0].ulVal == 0)  
    {  
      wprintf(L"Removed message from queue.\n");  
    }  
    else  
    {  
      wprintf(L"Removed message '%s' from queue.\n", wszLabelBuffer);  
    }  
  
    // If the message body is a string, display it.  
    if (msgprops.aPropVar[4].ulVal == VT_BSTR)  
    {  
      wprintf(L"Body: %s", (WCHAR*)pucBodyBuffer);  
      wprintf(L"\n");  
    }  
  }  
  
  // Close the queue and free the memory allocated for the body buffer.  
  hr = MQCloseQueue(hQueue);  
  free(pucBodyBuffer);  
  
  return hr;  
}