Поделиться через


Routing Messages to the MQ Dead Letter Queue

I recently had a fun problem to solve for a client of mine, essentially they wanted to route messages to the MQ Series dead letter queue and have the MQ dead letter queue handler move those messages to the queue they defined in the dead letter.

The first problem was getting the message on the MQ dead letter queue, John and Anil gave some great pointers here, it turns out to send a message to the dead letter queue the message needs to be pre-pended with the dead letter header (DLH) and the MQMD_Format property needs to be set to MQ "MQDEAD " so that MQ knows to expect the DLH.

Serializing the Dead Letter Header

To achieve this I wrote a DLH utilities component that allows the various fields of the DLH header to be set, the component also serializes the DLH into a byte[] so that it may be pre-pended to the message. The component was called from a custom send pipeline component which allows the key fields to be set at design time, the pipeline component also sets the MQMD_Format property to the message context.

The format of the MQ DLH struct is as follows:

char[] strucId = new char[4]; // Structure identifier - MQCHAR4 StrucId

int version = 1; // Structure version number

int reason; // Reason message arrived on dead-letter

char[] destQName = new char[48];// Name of original destination queue

char[] destQMgrName = new char[48];// Name of orig dest queue manager

int encoding; // Numeric encoding of data that follows MQDLH

int codedCharSetId; // Char set identifier of data that follows MQDLH

char[] format = new char[8]; // Format name of data that follows MQDLH

int putApplType; // Type of app that put message on DLH

char[] putApplName = new char[28]; // Name of app that put msg on DLH

char[] putDate = new char[8]; // Date when message was put the DLH

char[] putTime = new char[8]; // Time when message was put the DLH

This struct needs to be serialised into a byte[] and pre-pended to the message:

public byte[] SerializeHeader()

{

      byte[] header = new byte[172];

      int index = 0;

      byte[] tmp = null;

      // strucId

      int written = System.Text.Encoding.UTF8.GetBytes(strucId, 0, strucId.Length, header, index);

      index += 4;

                       

      // version

      tmp = BitConverter.GetBytes(version);

      tmp.CopyTo(header, index);

      index += 4;

      // reason

      tmp = BitConverter.GetBytes(reason);

      tmp.CopyTo(header, index);

      index += 4;

      // destQName

      written = System.Text.Encoding.UTF8.GetBytes(destQName, 0, destQName.Length, header, index);

      index += 48;

      // destQMgrName

      written = System.Text.Encoding.UTF8.GetBytes(destQMgrName, 0, destQMgrName.Length, header, index);

      index += 48;

      // encoding

      tmp = BitConverter.GetBytes(encoding);

      tmp.CopyTo(header, index);

      index += 4;

      // codedCharSetId

      tmp = BitConverter.GetBytes(codedCharSetId);

      tmp.CopyTo(header, index);

      index += 4;

      // format

      written = System.Text.Encoding.UTF8.GetBytes(format, 0, format.Length, header, index);

      index += 8;

      // putApplType

      tmp = BitConverter.GetBytes(putApplType);

      tmp.CopyTo(header, index);

      index += 4;

      // putApplName

      written = System.Text.Encoding.UTF8.GetBytes(putApplName, 0, putApplName.Length, header, index);

      index += 28;

      // putDate - Format: yyyymmdd

      written = System.Text.Encoding.UTF8.GetBytes(putDate, 0, putDate.Length, header, index);

      index += 8;

      // putTime - Format: hhmmss00

      written = System.Text.Encoding.UTF8.GetBytes(putTime, 0, putTime.Length, header, index);

      index += 8;

      return header;

}

Pipeline Component

The MQ adapter does not directly support setting this property, so the custom send pipeline component is responsible for pre-pending the message data stream with the DLH and setting the message context property which the MQ adapter will then set on the MQ message:

private static PropertyBase mqmd_Format = new MQSeries.MQMD_Format();

       

private const string MQFMT_DEAD_LETTER_HEADER = "MQDEAD ";

// Set MQMD_Format property to MQFMT_DEAD_LETTER_HEADER - to indicate the message

// is prepended with the dead letter header...

inmsg.Context.Write( mqmd_Format.Name.Name,

mqmd_Format.Name.Namespace,

MQFMT_DEAD_LETTER_HEADER );

// Pre-pend the message body with the MQS dead letter header...

inmsg.BodyPart.Data = DeadLetterHelper.BuildDeadLetterMessage(

inmsg.BodyPart.GetOriginalDataStream(),

_DestinationQueue,

_QueueManager,

_ApplicationName );

All that is required then is for the send port to be configured to send the message to the SYSTEM.DEAD.LETTER.QUEUE.

Dead Letter Handler

Once the message is on the dead letter queue the dead handler (runmqdlq.exe) can be configured to take the message off the queue and put it on the destination queue defined in the DLH. This proved to be a little tricky to get working, thanks to Jason for his help in getting the dead letter handler working. 

runmqdlq.exe SYSTEM.DEAD.LETTER.QUEUE QM_demoappserver < qrule.rul

The handler may be fed a rule (qrule.rul), the example here removes the DLH and puts the message on the queue specified in the DLH, one thing to watch out for is the rule below needs to have a CRLF at the end!!

:

INPUTQM(QM_demoappserver) INPUTQ('SYSTEM.DEAD.LETTER.QUEUE') WAIT(NO)

ACTION(FWD) FWDQ(&DESTQ) HEADER(NO)